mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-14 03:30:19 +00:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a6c2a57f54 | |||
| eedf46a11d | |||
| ff61f4b3fa | |||
| 192111840c | |||
| 837a3daa58 | |||
| 5f6f053039 | |||
| 775be47513 | |||
| 2f265a9307 | |||
| 0fa2e2349c | |||
| 930344ae23 | |||
| 538195dfb4 |
@@ -14,7 +14,7 @@ In `NODE_ENV=development`, `AgentRuntimeService.executeStep()` automatically rec
|
||||
|
||||
**Data flow**: executeStep loop -> build `StepPresentationData` -> write partial snapshot to disk -> on completion, finalize to `.agent-tracing/{timestamp}_{traceId}.json`
|
||||
|
||||
**Context engine capture**: In `RuntimeExecutors.ts`, the `call_llm` executor emits a `context_engine_result` event after `serverMessagesEngine()` processes messages. This event carries the full `contextEngineInput` (DB messages, systemRole, model, knowledge, tools, userMemory, etc.) and the processed `output` messages (the final LLM payload).
|
||||
**Context engine capture**: In `RuntimeExecutors.ts`, the `call_llm` executor calls `ctx.tracingContextEngine(input, output)` after `serverMessagesEngine()` processes messages. `AgentRuntimeService.executeStep` buffers the call per step and forwards it to `OperationTraceRecorder.appendStep` as the typed `contextEngine` field. CE flows through this side channel rather than the `events` array so its heavy payload (agentDocuments, systemRole, …) never enters the Redis state pipeline (LOBE-9110).
|
||||
|
||||
## Package Location
|
||||
|
||||
@@ -199,9 +199,10 @@ interface StepSnapshot {
|
||||
messages?: any[]; // DB messages before step
|
||||
context?: { phase: string; payload?: unknown; stepContext?: unknown };
|
||||
events?: Array<{ type: string; [key: string]: unknown }>;
|
||||
// context_engine_result event contains:
|
||||
// input: full contextEngineInput (messages, systemRole, model, knowledge, tools, userMemory, ...)
|
||||
// output: processed messages array (final LLM payload)
|
||||
contextEngine?: {
|
||||
input?: unknown; // contextEngineInput minus messages + toolsConfig (reconstructible from baseline)
|
||||
output?: unknown; // processed messages array (final LLM payload)
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
@@ -216,5 +217,5 @@ When using `--messages`, the output shows three sections (if context engine data
|
||||
## Integration Points
|
||||
|
||||
- **Recording**: `src/server/services/agentRuntime/AgentRuntimeService.ts` — in the `executeStep()` method, after building `stepPresentationData`, writes partial snapshot in dev mode
|
||||
- **Context engine event**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, emits `context_engine_result` event
|
||||
- **Context engine capture**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, calls `ctx.tracingContextEngine(input, output)`. `AgentRuntimeService.executeStep` buffers it per step and passes it to `traceRecorder.appendStep` as the typed `contextEngine` field (kept off the `events` array to stay out of Redis state).
|
||||
- **Store**: `FileSnapshotStore` reads/writes to `.agent-tracing/` relative to `process.cwd()`
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
# LobeHub gateway streaming + tab-switch test harness
|
||||
|
||||
Captures store + DOM state at 200ms intervals so we can prove or disprove
|
||||
claims like "切回 tab 后消息回到了很早以前". Built for gateway-mode chat but
|
||||
works for any LobeHub streaming session.
|
||||
|
||||
## Files
|
||||
|
||||
`scripts/agent-gateway/`
|
||||
|
||||
| File | Role |
|
||||
| --------------- | ---------------------------------------------------------------- |
|
||||
| `probe.js` | Injects a 200ms sampler + `__PROBE_EVENT` marker + `__switchTab` |
|
||||
| `probe-dump.js` | Stops the sampler and returns `{events, samples}` as JSON string |
|
||||
| `tab-switch.js` | Runs N round-trip switches between two tabs, marks each step |
|
||||
| `analyze.mjs` | Node post-processor: timeline + regression detection |
|
||||
|
||||
## Standard workflow
|
||||
|
||||
```bash
|
||||
# 1. Start Electron with CDP
|
||||
./.agents/skills/local-testing/scripts/electron-dev.sh start
|
||||
|
||||
# 2. Navigate to a chat, switch runtime to Cloud Sandbox (gateway mode)
|
||||
|
||||
# 3. Install the probe + helpers
|
||||
agent-browser --cdp 9222 eval --stdin \
|
||||
< .agents/skills/local-testing/scripts/agent-gateway/probe.js
|
||||
|
||||
# 4. Send a tool-call message — manually or via type+press
|
||||
agent-browser --cdp 9222 eval "window.__PROBE_EVENT('SENT')"
|
||||
|
||||
# 5. Run the multi-switch driver (auto-picks active tab as BACK and the
|
||||
# rightmost inactive tab as AWAY — edit ROUND_TRIPS / DWELL_MS in the
|
||||
# file if you want different timing)
|
||||
agent-browser --cdp 9222 eval --stdin \
|
||||
< .agents/skills/local-testing/scripts/agent-gateway/tab-switch.js
|
||||
|
||||
# 6. Wait for streaming to finish, then dump
|
||||
agent-browser --cdp 9222 eval --stdin \
|
||||
< .agents/skills/local-testing/scripts/agent-gateway/probe-dump.js \
|
||||
> /tmp/probe.json
|
||||
|
||||
# 7. Analyze
|
||||
node .agents/skills/local-testing/scripts/agent-gateway/analyze.mjs /tmp/probe.json
|
||||
```
|
||||
|
||||
The analyzer prints three sections: EVENTS, TIMELINE, REGRESSIONS. If
|
||||
REGRESSIONS is non-empty it means content/reasoning/childN dropped on the
|
||||
same topic — the symptom users describe.
|
||||
|
||||
## What the probe tracks (and why)
|
||||
|
||||
`chat.messagesMap` only stores the top-level `assistantGroup` shell. The
|
||||
actual streamed content, reasoning, and tool calls live in
|
||||
`assistantGroup.children: AssistantContentBlock[]`. Any probe that only
|
||||
reads `m.content` / `m.reasoning` will see zeros throughout streaming and
|
||||
miss everything that matters. probe.js walks both levels and sums:
|
||||
|
||||
- `cT` total content length
|
||||
- `rT` total reasoning length
|
||||
- `toolT` total tool-call count
|
||||
- `childN` number of content blocks
|
||||
|
||||
Plus DOM-side signals (`domLen`, search/crawl indicator counts) so you can
|
||||
tell store-side regressions apart from render-side regressions.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- **Optimistic new-topic state.** Before the first chunk lands, messages
|
||||
live under the `<scope>_new` key with `tmp_*` ids and no `topicId` field.
|
||||
probe.js falls back to those when `activeTopicId` is null.
|
||||
- **Reasoning resets to 0 are not bugs.** When the assistant finishes
|
||||
thinking and starts tool-use or text, the streaming reasoning buffer
|
||||
empties and the finalised reasoning gets sealed into a completed block.
|
||||
Filter these out manually if needed.
|
||||
- **DOM length jitters by a handful of chars** because counters like "(10)"
|
||||
in tool-call labels change as results arrive. analyze.mjs only flags
|
||||
`domLen` drops greater than 100 chars to ignore that noise.
|
||||
- **Never identify tabs by innerText.** The active tab's text embeds a
|
||||
` · <agent name>` suffix, so a search like `'LobeHub Growth'` matches the
|
||||
active tab when the active agent happens to be LobeHub Growth — and you
|
||||
end up clicking the tab you're already on. probe.js uses the stable
|
||||
`data-contextmenu-trigger` attribute (a React `useId()` value that's set
|
||||
per-tab and survives focus changes) plus `data-active="true"` to mark
|
||||
the active one. Helpers exposed:
|
||||
`__listTabs()` / `__clickTabByKey(key)` / `__clickTabByIndex(i)` /
|
||||
`__activeTabKey()`.
|
||||
- **`tab-switch.js` fires-and-forgets.** The IIFE kicks off an async loop
|
||||
and returns immediately so the agent-browser CLI eval doesn't blow past
|
||||
its default 25 s timeout. Wait on the `SWITCH_LOOP_DONE` event marker
|
||||
before dumping. Re-running while a loop is in flight is refused — the
|
||||
chaotic data from overlapping runs is not worth debugging.
|
||||
@@ -0,0 +1,243 @@
|
||||
// Analyzer for probe-events dumps. Reads a JSON file produced by `run.ts dump`
|
||||
// and prints a layered breakdown:
|
||||
//
|
||||
// 1. STREAM EVENTS — every non-chunk WS/SSE event in receipt order
|
||||
// 2. CHUNKS SUMMARY — collapsed per-step chunk counts (otherwise floods)
|
||||
// 3. ACTION CALLS — replaceMessages / refreshMessages / MARK:* with stack
|
||||
// 4. CORRELATION — calls ↔ nearest stream event within ±300ms
|
||||
// 5. PER-KEY ASSISTANT GROWTH — for each messagesMap key, when the leading
|
||||
// assistant message's cLen / rLen actually moves (this is what reveals
|
||||
// "chunks arrived but the message never grew" regressions)
|
||||
// 6. ROLLBACKS — msgN / childN / role drops in the active-topic timeline
|
||||
//
|
||||
// Usage:
|
||||
// bun run .agents/skills/local-testing/scripts/agent-gateway/analyze-events.ts <dump.json>
|
||||
|
||||
import { readFileSync } from 'node:fs';
|
||||
|
||||
import type {
|
||||
ProbeActionCall,
|
||||
ProbeDump,
|
||||
ProbeMessageSummary,
|
||||
ProbeStreamEvent,
|
||||
ProbeTimelineSample,
|
||||
} from './types';
|
||||
|
||||
const file = process.argv[2];
|
||||
if (!file) {
|
||||
console.error('usage: bun run analyze-events.ts <dump.json>');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const raw = readFileSync(file, 'utf8');
|
||||
// agent-browser eval --stdin wraps return values in quotes when the value is
|
||||
// a string — so the JSON file may be double-encoded depending on how it was
|
||||
// captured. Handle both.
|
||||
const parsedOnce = JSON.parse(raw) as ProbeDump | string;
|
||||
const dump: ProbeDump = typeof parsedOnce === 'string' ? JSON.parse(parsedOnce) : parsedOnce;
|
||||
|
||||
const { streamEvents = [], actionCalls = [], timeline = [] } = dump;
|
||||
|
||||
const pad = (v: unknown, n: number) => String(v).padStart(n);
|
||||
|
||||
// ── META ───────────────────────────────────────────────────────────
|
||||
console.log('=== META ===');
|
||||
console.log(` events: ${streamEvents.length}`);
|
||||
console.log(` calls: ${actionCalls.length}`);
|
||||
console.log(` timeline: ${timeline.length}`);
|
||||
|
||||
// ── 1. STREAM EVENTS (non-chunk) ───────────────────────────────────
|
||||
const nonChunkEvents = streamEvents.filter((e) => e.type !== 'stream_chunk');
|
||||
const chunkEvents = streamEvents.filter((e) => e.type === 'stream_chunk');
|
||||
|
||||
console.log(
|
||||
`\n=== STREAM EVENTS (${nonChunkEvents.length} non-chunk + ${chunkEvents.length} chunks elided) ===`,
|
||||
);
|
||||
for (const e of nonChunkEvents) {
|
||||
const dataStr = e.dataKeys?.length ? ` [${e.dataKeys.join(',')}]` : '';
|
||||
const data = e.data as Record<string, unknown> | undefined;
|
||||
const uiHint = data?.uiMessagesPreview
|
||||
? ` uiPreview=${JSON.stringify(data.uiMessagesPreview)}`
|
||||
: data?.uiMessagesTotal
|
||||
? ` uiTotal=${data.uiMessagesTotal}`
|
||||
: '';
|
||||
const phaseHint = data?.phase ? ` phase=${data.phase}` : '';
|
||||
const extra = e.serverType ? ` serverType=${e.serverType}` : '';
|
||||
console.log(
|
||||
` t=${pad(e.t, 7)} [${(e.transport ?? '?').padEnd(3)}] step=${pad(e.stepIndex ?? '-', 2)} ` +
|
||||
`type=${(e.type ?? '').padEnd(22)} op=${e.opIdTail ?? '-'}${phaseHint}${uiHint}${extra}${dataStr}`,
|
||||
);
|
||||
}
|
||||
|
||||
// ── 2. CHUNK SUMMARY ───────────────────────────────────────────────
|
||||
console.log('\n=== CHUNKS SUMMARY (per step / chunkType) ===');
|
||||
const chunkBuckets = new Map<string, { count: number; firstT: number; lastT: number }>();
|
||||
for (const c of chunkEvents) {
|
||||
const data = c.data as Record<string, unknown> | undefined;
|
||||
const ct = (data?.chunkType as string | undefined) ?? '?';
|
||||
const key = `step=${c.stepIndex ?? '-'} chunkType=${ct.padEnd(8)} op=${c.opIdTail}`;
|
||||
const slot = chunkBuckets.get(key);
|
||||
if (slot) {
|
||||
slot.count += 1;
|
||||
slot.lastT = c.t;
|
||||
} else {
|
||||
chunkBuckets.set(key, { count: 1, firstT: c.t, lastT: c.t });
|
||||
}
|
||||
}
|
||||
for (const [k, v] of chunkBuckets) {
|
||||
console.log(` ${k} count=${pad(v.count, 4)} t=${pad(v.firstT, 7)}..${pad(v.lastT, 7)}`);
|
||||
}
|
||||
|
||||
// ── 3. ACTION CALLS ───────────────────────────────────────────────
|
||||
console.log('\n=== ACTION CALLS (replace/refresh/MARK) ===');
|
||||
for (const c of actionCalls) {
|
||||
if (c.name?.startsWith('MARK:')) {
|
||||
console.log(` t=${pad(c.t, 7)} ${c.name}`);
|
||||
continue;
|
||||
}
|
||||
const snapshot = (c.args as any)?.snapshot as
|
||||
| Array<{ id: string; role: string; cLen: number; rLen: number }>
|
||||
| undefined;
|
||||
const snapStr = snapshot?.length
|
||||
? ' snapshot=' + snapshot.map((m) => `${m.id}:${m.role}/c${m.cLen}/r${m.rLen}`).join(' | ')
|
||||
: '';
|
||||
const summary =
|
||||
c.name === 'replaceMessages'
|
||||
? `count=${c.args?.count} action=${(c.args?.params as any)?.action ?? '-'}${snapStr}`
|
||||
: c.name === 'refreshMessages'
|
||||
? `ctx=${JSON.stringify(c.args?.context)}`
|
||||
: c.error
|
||||
? `error=${c.error}`
|
||||
: '';
|
||||
console.log(` t=${pad(c.t, 7)} ${c.name.padEnd(20)} ${summary}`);
|
||||
if (c.stack) {
|
||||
const frames = c.stack
|
||||
.split(' ← ')
|
||||
.filter((f) => !!f && !f.includes('Object.<anonymous>'))
|
||||
.slice(0, 3);
|
||||
for (const f of frames) console.log(` ↳ ${f}`);
|
||||
}
|
||||
}
|
||||
|
||||
// ── 4. CORRELATION ────────────────────────────────────────────────
|
||||
function nearestEventForCall(
|
||||
call: ProbeActionCall,
|
||||
windowMs = 300,
|
||||
): { event: ProbeStreamEvent; delta: number } | null {
|
||||
let best: ProbeStreamEvent | null = null;
|
||||
let bestDelta = Infinity;
|
||||
for (const e of streamEvents) {
|
||||
const d = Math.abs(e.t - call.t);
|
||||
if (d < bestDelta && d <= windowMs) {
|
||||
bestDelta = d;
|
||||
best = e;
|
||||
}
|
||||
}
|
||||
return best ? { event: best, delta: bestDelta } : null;
|
||||
}
|
||||
|
||||
console.log('\n=== CORRELATION (replace/refresh ↔ nearest event within ±300ms) ===');
|
||||
for (const c of actionCalls) {
|
||||
if (c.name !== 'refreshMessages' && c.name !== 'replaceMessages') continue;
|
||||
const hit = nearestEventForCall(c);
|
||||
if (hit) {
|
||||
const phase = (hit.event.data as Record<string, unknown> | undefined)?.phase;
|
||||
console.log(
|
||||
` t=${pad(c.t, 7)} ${c.name.padEnd(16)} ← Δ${pad(hit.delta, 4)}ms ${hit.event.type}` +
|
||||
(phase ? ` phase=${phase}` : ''),
|
||||
);
|
||||
} else {
|
||||
console.log(` t=${pad(c.t, 7)} ${c.name.padEnd(16)} ← (no event nearby — external trigger)`);
|
||||
}
|
||||
}
|
||||
|
||||
// ── 5. PER-KEY ASSISTANT GROWTH ───────────────────────────────────
|
||||
// For each messagesMap key, find the trailing assistant message and report
|
||||
// the points in time where its cLen / rLen actually changed. If the timeline
|
||||
// shows chunks arriving but the assistant cLen never moves, that's the
|
||||
// signature of "dispatch queue blocked / messageId mismatch".
|
||||
console.log('\n=== PER-KEY ASSISTANT GROWTH ===');
|
||||
const keysEverSeen = new Set<string>();
|
||||
for (const s of timeline) for (const k of Object.keys(s.byKey ?? {})) keysEverSeen.add(k);
|
||||
|
||||
for (const key of keysEverSeen) {
|
||||
console.log(`\n key=${key}`);
|
||||
let lastSig: string | null = null;
|
||||
for (const s of timeline) {
|
||||
const slot = s.byKey?.[key];
|
||||
if (!slot) continue;
|
||||
const last = slot.msgs.at(-1) as ProbeMessageSummary | undefined;
|
||||
if (!last) continue;
|
||||
const sig = `${last.id}|c${last.cLen}|r${last.rLen}|n${slot.n}`;
|
||||
if (sig === lastSig) continue;
|
||||
lastSig = sig;
|
||||
console.log(
|
||||
` t=${pad(s.t, 7)} msgN=${pad(slot.n, 3)} ` +
|
||||
`lastAssistant=${last.id} cLen=${pad(last.cLen, 5)} rLen=${pad(last.rLen, 5)}` +
|
||||
` runOps=${s.runOps}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── 6. ROLLBACKS (active-topic msgN / childN / role drops) ─────────
|
||||
console.log('\n=== ROLLBACKS (active-topic msgN / childN / role drops) ===');
|
||||
let prev: ProbeTimelineSample | null = null;
|
||||
const rollbacks: Array<{ t: number; topic: string | null; drops: string[] }> = [];
|
||||
|
||||
const flatten = (s: ProbeTimelineSample) => {
|
||||
if (!s.activeTopic) return [];
|
||||
return Object.entries(s.byKey ?? {})
|
||||
.filter(([k]) => k.includes(s.activeTopic!))
|
||||
.flatMap(([, v]) => v.msgs);
|
||||
};
|
||||
|
||||
for (const s of timeline) {
|
||||
if (s.err) {
|
||||
prev = null;
|
||||
continue;
|
||||
}
|
||||
if (!prev || prev.activeTopic !== s.activeTopic) {
|
||||
prev = s;
|
||||
continue;
|
||||
}
|
||||
const prevMsgs = flatten(prev);
|
||||
const curMsgs = flatten(s);
|
||||
const drops: string[] = [];
|
||||
|
||||
if (curMsgs.length < prevMsgs.length) drops.push(`msgN ${prevMsgs.length}→${curMsgs.length}`);
|
||||
|
||||
let prevChild = 0;
|
||||
let curChild = 0;
|
||||
for (const m of prevMsgs) prevChild += m.chN ?? 0;
|
||||
for (const m of curMsgs) curChild += m.chN ?? 0;
|
||||
if (curChild < prevChild) drops.push(`childN ${prevChild}→${curChild}`);
|
||||
|
||||
const prevById = new Map(prevMsgs.map((m) => [m.id, m]));
|
||||
for (const m of curMsgs) {
|
||||
const pr = prevById.get(m.id);
|
||||
if (!pr) continue;
|
||||
if (m.cLen < pr.cLen) drops.push(`cLen[${m.id}] ${pr.cLen}→${m.cLen}`);
|
||||
if (m.rLen < pr.rLen) drops.push(`rLen[${m.id}] ${pr.rLen}→${m.rLen}`);
|
||||
}
|
||||
|
||||
if (drops.length) rollbacks.push({ t: s.t, topic: s.activeTopic, drops });
|
||||
prev = s;
|
||||
}
|
||||
|
||||
if (rollbacks.length === 0) {
|
||||
console.log(' (none)');
|
||||
} else {
|
||||
for (const r of rollbacks) {
|
||||
const nearEvent = streamEvents
|
||||
.filter((e) => Math.abs(e.t - r.t) <= 300)
|
||||
.map((e) => `${e.type}${(e.data as any)?.phase ? ':' + (e.data as any).phase : ''}`);
|
||||
const nearCall = actionCalls
|
||||
.filter((c) => Math.abs(c.t - r.t) <= 300 && !c.name?.startsWith('MARK:'))
|
||||
.map((c) => c.name);
|
||||
console.log(
|
||||
` t=${pad(r.t, 7)} topic=${r.topic} ${r.drops.join(' | ')}` +
|
||||
(nearEvent.length ? ` near-event:[${nearEvent.join(',')}]` : '') +
|
||||
(nearCall.length ? ` near-call:[${nearCall.join(',')}]` : ''),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
#!/usr/bin/env node
|
||||
// Analyze a probe dump captured by probe.js + probe-dump.js.
|
||||
//
|
||||
// node analyze.mjs /tmp/probe.json
|
||||
//
|
||||
// Prints:
|
||||
// 1. EVENTS — user-action markers with their relative timestamps
|
||||
// 2. TIMELINE — periodic samples (~1 per second + event-adjacent samples)
|
||||
// showing every interesting field; columns:
|
||||
// t(ms) | runOps | msgN | childN | content | reasoning | tools | domLen | search | crawl | topic | event
|
||||
// 3. REGRESSIONS — every place a tracked counter *dropped* on the same
|
||||
// topic between adjacent samples. A "true" UI rollback shows up as a
|
||||
// drop in content/reasoning/tools/childN/domLen without a topic change.
|
||||
//
|
||||
// Whitelisted transitions (not flagged):
|
||||
// - topic change → all drops expected (focus moved away)
|
||||
// - reasoning length 0 after content starts → reasoning gets sealed into a
|
||||
// completed sub-block; the parent's running reasoning resets to ''.
|
||||
// - msgN drop when topic transitions from `_new` placeholder to a real id.
|
||||
|
||||
import fs from 'node:fs';
|
||||
|
||||
const file = process.argv[2];
|
||||
if (!file) {
|
||||
console.error('usage: node analyze.mjs <probe.json>');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const raw = JSON.parse(fs.readFileSync(file, 'utf8'));
|
||||
// probe-dump.js wraps the payload in JSON.stringify so agent-browser returns
|
||||
// it as a single quoted string. Unwrap.
|
||||
const data = typeof raw === 'string' ? JSON.parse(raw) : raw;
|
||||
const { events, samples } = data;
|
||||
|
||||
const fmt = {
|
||||
pad(v, n) {
|
||||
return String(v).padStart(n);
|
||||
},
|
||||
};
|
||||
|
||||
console.log('=== EVENTS ===');
|
||||
for (const e of events) console.log(` t=${fmt.pad(e.t, 7)} ${e.name}`);
|
||||
|
||||
console.log(
|
||||
'\n=== TIMELINE (~1s cadence, plus event-adjacent samples) ===\n' +
|
||||
' t(ms) runOps msgN childN content reasoning tools domLen search crawl topic event',
|
||||
);
|
||||
|
||||
let lastSampledAt = -1e9;
|
||||
const eventBuckets = events.map((e) => e.t);
|
||||
for (let i = 0; i < samples.length; i++) {
|
||||
const s = samples[i];
|
||||
const nearEvent = eventBuckets.some((et) => Math.abs(et - s.t) < 110);
|
||||
if (!nearEvent && s.t - lastSampledAt < 1000) continue;
|
||||
lastSampledAt = s.t;
|
||||
|
||||
const ev = events.find((e) => Math.abs(e.t - s.t) < 110);
|
||||
const evMarker = ev ? ` ◀ ${ev.name}` : '';
|
||||
const topicSuffix = s.topicId ? s.topicId.slice(-6) : '(none)';
|
||||
const search = s.ind?.search ?? 0;
|
||||
const crawl = s.ind?.crawl ?? 0;
|
||||
console.log(
|
||||
` ${fmt.pad(s.t, 6)} ` +
|
||||
`${fmt.pad(s.runOps, 6)} ` +
|
||||
`${fmt.pad(s.msgN, 4)} ` +
|
||||
`${fmt.pad(s.childN ?? 0, 5)} ` +
|
||||
`${fmt.pad(s.cT ?? 0, 8)} ` +
|
||||
`${fmt.pad(s.rT ?? 0, 9)} ` +
|
||||
`${fmt.pad(s.toolT ?? 0, 5)} ` +
|
||||
`${fmt.pad(s.domLen ?? 0, 7)} ` +
|
||||
`${fmt.pad(search, 6)} ` +
|
||||
`${fmt.pad(crawl, 5)} ` +
|
||||
`${topicSuffix.padEnd(8)}${evMarker}`,
|
||||
);
|
||||
}
|
||||
|
||||
console.log('\n=== REGRESSIONS (same topic, value dropped) ===');
|
||||
const regressions = [];
|
||||
for (let i = 1; i < samples.length; i++) {
|
||||
const prev = samples[i - 1];
|
||||
const cur = samples[i];
|
||||
if (!cur.topicId || prev.topicId !== cur.topicId) continue;
|
||||
|
||||
const drops = [];
|
||||
if (cur.msgN < prev.msgN) drops.push(`msgN: ${prev.msgN}→${cur.msgN}`);
|
||||
if ((cur.childN ?? 0) < (prev.childN ?? 0)) drops.push(`childN: ${prev.childN}→${cur.childN}`);
|
||||
if ((cur.cT ?? 0) < (prev.cT ?? 0)) drops.push(`content: ${prev.cT}→${cur.cT}`);
|
||||
if ((cur.rT ?? 0) < (prev.rT ?? 0)) drops.push(`reasoning: ${prev.rT}→${cur.rT}`);
|
||||
if ((cur.toolT ?? 0) < (prev.toolT ?? 0)) drops.push(`tools: ${prev.toolT}→${cur.toolT}`);
|
||||
// domLen jitters by a few chars from counter labels — only flag big drops.
|
||||
if ((cur.domLen ?? 0) < (prev.domLen ?? 0) - 100) {
|
||||
drops.push(`domLen: ${prev.domLen}→${cur.domLen}`);
|
||||
}
|
||||
if (drops.length === 0) continue;
|
||||
|
||||
const nearbyEv = events.filter((e) => Math.abs(e.t - cur.t) < 600).map((e) => e.name);
|
||||
regressions.push({ t: cur.t, topic: cur.topicId.slice(-6), drops, nearbyEv });
|
||||
}
|
||||
|
||||
if (regressions.length === 0) {
|
||||
console.log(' (none)');
|
||||
} else {
|
||||
for (const r of regressions) {
|
||||
const evStr = r.nearbyEv.length ? ` near:[${r.nearbyEv.join(',')}]` : '';
|
||||
console.log(` t=${fmt.pad(r.t, 7)} topic=${r.topic} ${r.drops.join(' | ')}${evStr}`);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`\n=== SUMMARY ===`);
|
||||
console.log(` samples: ${samples.length}`);
|
||||
console.log(` events: ${events.length}`);
|
||||
console.log(` regressions: ${regressions.length}`);
|
||||
if (samples.length) {
|
||||
const last = samples.at(-1);
|
||||
console.log(
|
||||
` final: msgN=${last.msgN} childN=${last.childN ?? 0} content=${last.cT ?? 0} ` +
|
||||
`reasoning=${last.rT ?? 0} tools=${last.toolT ?? 0} runOps=${last.runOps}`,
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
// Stop the probe and serialize collected data.
|
||||
//
|
||||
// agent-browser --cdp 9222 eval --stdin < probe-dump.js > /tmp/probe.json
|
||||
//
|
||||
// The whole thing is wrapped in a JSON.stringify so agent-browser returns it
|
||||
// as a single quoted string — the analyzer double-parses to handle that.
|
||||
|
||||
(function () {
|
||||
if (window.__PROBE_TIMER) {
|
||||
clearInterval(window.__PROBE_TIMER);
|
||||
window.__PROBE_TIMER = null;
|
||||
}
|
||||
return JSON.stringify({
|
||||
events: window.__PROBE_EVENTS || [],
|
||||
samples: window.__PROBE_SAMPLES || [],
|
||||
});
|
||||
})();
|
||||
@@ -0,0 +1,37 @@
|
||||
// Stops the events-probe timeline timer and stashes the full capture as a
|
||||
// JSON string on `window.__PROBE_LAST_DUMP_JSON`. `run.ts` wraps the bundle
|
||||
// in an IIFE that returns that global, which `agent-browser eval` prints to
|
||||
// stdout — the runner then persists it under `.agent-gateway/`.
|
||||
|
||||
import type { ProbeDump } from './types';
|
||||
|
||||
declare global {
|
||||
interface Window {
|
||||
__PROBE_LAST_DUMP_JSON?: string;
|
||||
}
|
||||
}
|
||||
|
||||
const w = window;
|
||||
|
||||
if (w.__PROBE_TIMELINE_TIMER) {
|
||||
clearInterval(w.__PROBE_TIMELINE_TIMER);
|
||||
w.__PROBE_TIMELINE_TIMER = null;
|
||||
}
|
||||
|
||||
const mutations = w.__PROBE_MUTATIONS ?? [];
|
||||
|
||||
const dump: ProbeDump & { mutations: typeof mutations } = {
|
||||
meta: {
|
||||
t0: w.__PROBE_T0 ?? 0,
|
||||
collectedAt: Date.now(),
|
||||
sampleCount: (w.__PROBE_MSG_TIMELINE ?? []).length,
|
||||
eventCount: (w.__PROBE_STREAM_EVENTS ?? []).length,
|
||||
callCount: (w.__PROBE_ACTION_CALLS ?? []).length,
|
||||
},
|
||||
streamEvents: w.__PROBE_STREAM_EVENTS ?? [],
|
||||
actionCalls: w.__PROBE_ACTION_CALLS ?? [],
|
||||
timeline: w.__PROBE_MSG_TIMELINE ?? [],
|
||||
mutations,
|
||||
};
|
||||
|
||||
w.__PROBE_LAST_DUMP_JSON = JSON.stringify(dump);
|
||||
@@ -0,0 +1,637 @@
|
||||
// LobeHub gateway raw-event-stream probe.
|
||||
//
|
||||
// Gateway-mode chats subscribe via WebSocket — NOT via the `/api/agent/stream`
|
||||
// SSE endpoint (that one belongs to the direct/client durable-agent runtime).
|
||||
// `AgentStreamClient` (`packages/agent-gateway-client/src/client.ts`) opens
|
||||
// `new WebSocket('wss://.../ws?operationId=...')`, then parses JSON frames in
|
||||
// its `onmessage` handler and re-emits `agent_event.event` objects to the
|
||||
// chat store.
|
||||
//
|
||||
// To capture the RAW gateway events before the store touches them, we wrap
|
||||
// `window.WebSocket` so that for any socket whose URL contains `operationId=`
|
||||
// we intercept the `onmessage` handler / `addEventListener('message')` and
|
||||
// log every `agent_event` frame.
|
||||
//
|
||||
// We *also* keep the `window.fetch` hook for `/api/agent/stream` so this
|
||||
// probe still works for direct-mode runs — but gateway-mode events come
|
||||
// through the WebSocket path.
|
||||
//
|
||||
// Buffers (read via `dump`):
|
||||
// __PROBE_STREAM_EVENTS — raw events parsed off the wire
|
||||
// __PROBE_ACTION_CALLS — replaceMessages / refreshMessages calls (best-effort)
|
||||
// __PROBE_MSG_TIMELINE — 200ms snapshots of every messagesMap key
|
||||
|
||||
import type {
|
||||
ProbeActionCall,
|
||||
ProbeMessageSummary,
|
||||
ProbeStreamEvent,
|
||||
ProbeTimelineSample,
|
||||
} from './types';
|
||||
|
||||
// Bundled by esbuild as an IIFE. Top-level code runs once on injection.
|
||||
|
||||
const w = window;
|
||||
|
||||
// ── Buffers ─────────────────────────────────────────────────────────
|
||||
|
||||
declare global {
|
||||
interface Window {
|
||||
__PROBE_MUTATIONS?: Array<{
|
||||
t: number;
|
||||
key: string;
|
||||
n: number;
|
||||
last?: { id: string; role: string; cLen: number; rLen: number; updatedAt?: unknown };
|
||||
prevLast?: { id: string; role: string; cLen: number; rLen: number };
|
||||
delta?: string;
|
||||
}>;
|
||||
__PROBE_STORE_UNSUB?: () => void;
|
||||
}
|
||||
}
|
||||
|
||||
const events: ProbeStreamEvent[] = (w.__PROBE_STREAM_EVENTS ??= []);
|
||||
const calls: ProbeActionCall[] = (w.__PROBE_ACTION_CALLS ??= []);
|
||||
const timeline: ProbeTimelineSample[] = (w.__PROBE_MSG_TIMELINE ??= []);
|
||||
const mutations = (w.__PROBE_MUTATIONS ??= []);
|
||||
events.length = 0;
|
||||
calls.length = 0;
|
||||
timeline.length = 0;
|
||||
mutations.length = 0;
|
||||
|
||||
const t0 = Date.now();
|
||||
w.__PROBE_T0 = t0;
|
||||
const now = (): number => Date.now() - t0;
|
||||
|
||||
// ── Helpers ─────────────────────────────────────────────────────────
|
||||
|
||||
function summarizeData(data: unknown): Record<string, unknown> | unknown {
|
||||
if (!data || typeof data !== 'object') return data;
|
||||
const src = data as Record<string, unknown>;
|
||||
const out: Record<string, unknown> = {};
|
||||
for (const k of Object.keys(src)) {
|
||||
const v = src[k];
|
||||
if (v == null) {
|
||||
out[k] = v;
|
||||
} else if (Array.isArray(v)) {
|
||||
out[k] = `Array(${v.length})`;
|
||||
if (k === 'uiMessages') {
|
||||
out.uiMessagesPreview = v.slice(0, 5).map((m: any) => ({
|
||||
id: (m.id ?? '').slice(-8),
|
||||
role: m.role,
|
||||
cLen: (m.content ?? '').length,
|
||||
children: (m.children ?? []).length,
|
||||
tools: (m.tools ?? []).length,
|
||||
reasoning: (m.reasoning?.content ?? '').length,
|
||||
}));
|
||||
out.uiMessagesTotal = v.length;
|
||||
}
|
||||
} else if (typeof v === 'object') {
|
||||
const obj = v as Record<string, unknown>;
|
||||
out[k] =
|
||||
'Object{' +
|
||||
Object.keys(obj)
|
||||
.slice(0, 6)
|
||||
.map((kk) => kk + (typeof obj[kk] === 'string' ? `=${(obj[kk] as string).length}ch` : ''))
|
||||
.join(',') +
|
||||
'}';
|
||||
} else if (typeof v === 'string') {
|
||||
out[k] = v.length > 100 ? v.slice(0, 100) + `…(${v.length})` : v;
|
||||
} else {
|
||||
out[k] = v;
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function summarizeMessages(msgs: any[]): ProbeMessageSummary[] {
|
||||
return (msgs ?? []).slice(0, 80).map((m) => ({
|
||||
id: (m.id ?? '').slice(-8),
|
||||
role: m.role,
|
||||
cLen: (m.content ?? '').length,
|
||||
rLen: (m.reasoning?.content ?? '').length,
|
||||
tools: (m.tools ?? []).length,
|
||||
chN: (m.children ?? []).length,
|
||||
}));
|
||||
}
|
||||
|
||||
function shortStack(): string {
|
||||
const raw = new Error('probe-stack').stack ?? '';
|
||||
return raw
|
||||
.split('\n')
|
||||
.slice(3)
|
||||
.filter((l) => !l.includes('probe-events') && !l.includes('node_modules'))
|
||||
.map((l) => l.trim().replace(/^at\s+/, ''))
|
||||
.slice(0, 6)
|
||||
.join(' ← ');
|
||||
}
|
||||
|
||||
function recordAgentEvent(args: {
|
||||
transport: 'ws' | 'sse';
|
||||
opId: string | null;
|
||||
agentEvent: any;
|
||||
eventId?: string | null;
|
||||
rawLen?: number;
|
||||
}): void {
|
||||
const { transport, opId, agentEvent, eventId, rawLen } = args;
|
||||
if (!agentEvent || typeof agentEvent !== 'object') return;
|
||||
events.push({
|
||||
t: now(),
|
||||
transport,
|
||||
opIdTail: (opId ?? '').slice(-10),
|
||||
eventId: eventId ?? null,
|
||||
type: agentEvent.type,
|
||||
stepIndex: agentEvent.stepIndex,
|
||||
dataKeys: agentEvent.data ? Object.keys(agentEvent.data) : [],
|
||||
data: summarizeData(agentEvent.data) as Record<string, unknown>,
|
||||
rawLen,
|
||||
});
|
||||
}
|
||||
|
||||
// ── 1. Patch window.WebSocket for gateway WS events ────────────────
|
||||
|
||||
if (!w.__PROBE_ORIG_WEBSOCKET) w.__PROBE_ORIG_WEBSOCKET = w.WebSocket;
|
||||
const OrigWS = w.__PROBE_ORIG_WEBSOCKET;
|
||||
|
||||
function extractOpIdFromWsUrl(url: string | URL): string | null {
|
||||
const m = String(url ?? '').match(/operationId=([^&]+)/);
|
||||
return m ? decodeURIComponent(m[1]) : null;
|
||||
}
|
||||
|
||||
function isGatewayWs(url: string | URL): boolean {
|
||||
return String(url ?? '').includes('operationId=');
|
||||
}
|
||||
|
||||
function handleWsFrame(rawData: unknown, opId: string | null): void {
|
||||
const rawLen = typeof rawData === 'string' ? rawData.length : -1;
|
||||
let parsed: any;
|
||||
try {
|
||||
parsed = typeof rawData === 'string' ? JSON.parse(rawData) : null;
|
||||
} catch {
|
||||
events.push({
|
||||
t: now(),
|
||||
transport: 'ws',
|
||||
opIdTail: (opId ?? '').slice(-10),
|
||||
type: '_PARSE_ERROR_',
|
||||
raw: typeof rawData === 'string' && rawData.length < 400 ? rawData : '(non-string or large)',
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (!parsed) return;
|
||||
|
||||
if (parsed.type === 'agent_event') {
|
||||
recordAgentEvent({
|
||||
transport: 'ws',
|
||||
opId,
|
||||
agentEvent: parsed.event,
|
||||
eventId: parsed.id,
|
||||
rawLen,
|
||||
});
|
||||
} else {
|
||||
events.push({
|
||||
t: now(),
|
||||
transport: 'ws',
|
||||
opIdTail: (opId ?? '').slice(-10),
|
||||
type: '_SERVER_MSG_',
|
||||
serverType: parsed.type,
|
||||
rawLen,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Wrap the constructor. Instance `constructor` will still reflect OrigWS
|
||||
// (we share prototypes), so use the `_WS_OPEN_` sentinel events to confirm
|
||||
// the patch is firing.
|
||||
function PatchedWebSocket(this: WebSocket, url: string | URL, protocols?: string | string[]) {
|
||||
const ws: WebSocket = protocols == null ? new OrigWS(url) : new OrigWS(url, protocols);
|
||||
const opId = extractOpIdFromWsUrl(url);
|
||||
if (!isGatewayWs(url)) return ws;
|
||||
|
||||
events.push({
|
||||
t: now(),
|
||||
transport: 'ws',
|
||||
opIdTail: (opId ?? '').slice(-10),
|
||||
type: '_WS_OPEN_',
|
||||
url: String(url),
|
||||
});
|
||||
|
||||
// One observer listener that always fires, regardless of how the consumer
|
||||
// (AgentStreamClient uses `ws.onmessage = …`) subscribes.
|
||||
ws.addEventListener('message', (e) => {
|
||||
try {
|
||||
handleWsFrame((e as MessageEvent).data, opId);
|
||||
} catch {
|
||||
/* swallow */
|
||||
}
|
||||
});
|
||||
|
||||
ws.addEventListener('close', () => {
|
||||
events.push({
|
||||
t: now(),
|
||||
transport: 'ws',
|
||||
opIdTail: (opId ?? '').slice(-10),
|
||||
type: '_WS_CLOSE_',
|
||||
});
|
||||
});
|
||||
|
||||
return ws;
|
||||
}
|
||||
|
||||
// Preserve prototype + static fields so `instanceof WebSocket` and
|
||||
// `WebSocket.OPEN` constants still work.
|
||||
(PatchedWebSocket as unknown as { prototype: WebSocket }).prototype = OrigWS.prototype;
|
||||
for (const k of Object.keys(OrigWS) as Array<keyof typeof OrigWS>) {
|
||||
try {
|
||||
(PatchedWebSocket as any)[k] = (OrigWS as any)[k];
|
||||
} catch {
|
||||
/* readonly */
|
||||
}
|
||||
}
|
||||
(['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'] as const).forEach((k) => {
|
||||
(PatchedWebSocket as any)[k] = (OrigWS as any)[k];
|
||||
});
|
||||
w.WebSocket = PatchedWebSocket as unknown as typeof WebSocket;
|
||||
|
||||
// ── 2. Patch window.fetch for `/api/agent/stream` (direct-mode SSE) ─
|
||||
|
||||
if (!w.__PROBE_ORIG_FETCH) w.__PROBE_ORIG_FETCH = w.fetch.bind(w);
|
||||
const origFetch = w.__PROBE_ORIG_FETCH;
|
||||
|
||||
function isAgentStreamUrl(input: RequestInfo | URL): boolean {
|
||||
let url = '';
|
||||
if (typeof input === 'string') url = input;
|
||||
else if (input instanceof URL) url = input.toString();
|
||||
else if (input && typeof (input as Request).url === 'string') url = (input as Request).url;
|
||||
return url.includes('/api/agent/stream');
|
||||
}
|
||||
|
||||
function extractOpIdFromHttpUrl(input: RequestInfo | URL): string | null {
|
||||
const url = typeof input === 'string' ? input : (input as Request | URL).toString();
|
||||
const m = url.match(/operationId=([^&]+)/);
|
||||
return m ? decodeURIComponent(m[1]) : null;
|
||||
}
|
||||
|
||||
function pushFromSSEFrame(rawFrame: string, opId: string | null): void {
|
||||
const lines = rawFrame.split('\n');
|
||||
let dataJson = '';
|
||||
let evtName = 'message';
|
||||
for (const line of lines) {
|
||||
if (line.startsWith('event:')) evtName = line.slice(6).trim();
|
||||
else if (line.startsWith('data:')) dataJson += line.slice(5).trim();
|
||||
}
|
||||
if (!dataJson) return;
|
||||
let parsed: any;
|
||||
try {
|
||||
parsed = JSON.parse(dataJson);
|
||||
} catch {
|
||||
events.push({
|
||||
t: now(),
|
||||
transport: 'sse',
|
||||
opIdTail: (opId ?? '').slice(-10),
|
||||
type: '_PARSE_ERROR_',
|
||||
sseEvent: evtName,
|
||||
raw: dataJson.length > 400 ? dataJson.slice(0, 400) + '…' : dataJson,
|
||||
});
|
||||
return;
|
||||
}
|
||||
recordAgentEvent({
|
||||
transport: 'sse',
|
||||
opId,
|
||||
agentEvent: parsed,
|
||||
eventId: null,
|
||||
rawLen: dataJson.length,
|
||||
});
|
||||
}
|
||||
|
||||
async function teeAndDrain(response: Response, opId: string | null): Promise<Response> {
|
||||
if (!response.body) return response;
|
||||
const [a, b] = response.body.tee();
|
||||
|
||||
void (async () => {
|
||||
const reader = b.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buf = '';
|
||||
try {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
buf += decoder.decode(value, { stream: true });
|
||||
let idx: number;
|
||||
|
||||
while ((idx = buf.indexOf('\n\n')) !== -1) {
|
||||
const frame = buf.slice(0, idx);
|
||||
buf = buf.slice(idx + 2);
|
||||
if (frame.trim()) pushFromSSEFrame(frame, opId);
|
||||
}
|
||||
}
|
||||
if (buf.trim()) pushFromSSEFrame(buf, opId);
|
||||
} catch (e: any) {
|
||||
events.push({
|
||||
t: now(),
|
||||
transport: 'sse',
|
||||
opIdTail: (opId ?? '').slice(-10),
|
||||
type: '_TEE_ERROR_',
|
||||
message: String(e?.message ?? e),
|
||||
});
|
||||
}
|
||||
})();
|
||||
|
||||
return new Response(a, {
|
||||
headers: response.headers,
|
||||
status: response.status,
|
||||
statusText: response.statusText,
|
||||
});
|
||||
}
|
||||
|
||||
w.fetch = async function patchedFetch(input: RequestInfo | URL, init?: RequestInit) {
|
||||
const response = await origFetch(input as any, init);
|
||||
if (!isAgentStreamUrl(input)) return response;
|
||||
const opId = extractOpIdFromHttpUrl(input);
|
||||
const url =
|
||||
typeof input === 'string'
|
||||
? input.split('?')[0]
|
||||
: (input as Request | URL).toString().split('?')[0];
|
||||
events.push({
|
||||
t: now(),
|
||||
transport: 'sse',
|
||||
opIdTail: (opId ?? '').slice(-10),
|
||||
type: '_CONNECTED_',
|
||||
url,
|
||||
status: response.status,
|
||||
});
|
||||
return teeAndDrain(response, opId);
|
||||
} as typeof fetch;
|
||||
|
||||
// ── 3. Wrap store actions (best-effort for "who called replace") ────
|
||||
|
||||
// Side-global stash for the original chat-store actions. Re-installs ALWAYS
|
||||
// rewrap from the originals so updates to the probe body take effect
|
||||
// without a page reload — using only a `__probeWrapped` flag on the chat
|
||||
// state object would freeze the first-installed wrapper across re-installs.
|
||||
declare global {
|
||||
interface Window {
|
||||
__PROBE_ORIG_REFRESH_MESSAGES?: any;
|
||||
__PROBE_ORIG_REPLACE_MESSAGES?: any;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const chat = w.__LOBE_STORES?.chat?.();
|
||||
if (chat) {
|
||||
// First-time install: cache the originals. Re-install: restore from
|
||||
// the cached originals before wrapping again.
|
||||
if (!w.__PROBE_ORIG_REFRESH_MESSAGES) w.__PROBE_ORIG_REFRESH_MESSAGES = chat.refreshMessages;
|
||||
if (!w.__PROBE_ORIG_REPLACE_MESSAGES) w.__PROBE_ORIG_REPLACE_MESSAGES = chat.replaceMessages;
|
||||
const origRefresh = w.__PROBE_ORIG_REFRESH_MESSAGES;
|
||||
const origReplace = w.__PROBE_ORIG_REPLACE_MESSAGES;
|
||||
chat.refreshMessages = origRefresh;
|
||||
chat.replaceMessages = origReplace;
|
||||
|
||||
chat.refreshMessages = async function probeRefresh(this: unknown, ...args: any[]) {
|
||||
calls.push({
|
||||
t: now(),
|
||||
name: 'refreshMessages',
|
||||
args: { context: args[0] ?? null },
|
||||
stack: shortStack(),
|
||||
});
|
||||
return origRefresh.apply(this, args);
|
||||
};
|
||||
chat.replaceMessages = function probeReplace(this: unknown, ...args: any[]) {
|
||||
const msgs = (args[0] as any[]) ?? [];
|
||||
const snapshot = msgs.slice(-2).map((m) => ({
|
||||
id: (m.id ?? '').slice(-8),
|
||||
role: m.role,
|
||||
cLen: (m.content ?? '').length,
|
||||
rLen: (m.reasoning?.content ?? '').length,
|
||||
updatedAt: m.updatedAt,
|
||||
}));
|
||||
calls.push({
|
||||
t: now(),
|
||||
name: 'replaceMessages',
|
||||
args: { count: msgs.length, params: args[1] ?? null, snapshot } as any,
|
||||
stack: shortStack(),
|
||||
});
|
||||
|
||||
// Pair the call with a mutation row so the analyzer can build a
|
||||
// single ordered timeline across replaceMessages + dispatchMessage.
|
||||
const stackTop = shortStack().split(' ← ')[0]?.slice(0, 80);
|
||||
const last = msgs.at(-1);
|
||||
const lastSum = last
|
||||
? {
|
||||
id: (last.id ?? '').slice(-8),
|
||||
role: last.role,
|
||||
cLen: (last.content ?? '').length,
|
||||
rLen: (last.reasoning?.content ?? '').length,
|
||||
updatedAt: last.updatedAt,
|
||||
}
|
||||
: undefined;
|
||||
const params: any = args[1] ?? {};
|
||||
const ctxKey = params.context
|
||||
? `main_${params.context.agentId ?? '?'}_${
|
||||
params.context.topicId ? 'tpc_' + params.context.topicId : 'new'
|
||||
}`.replace('main_tpc_', 'main_') // crude key inference
|
||||
: '(no-ctx)';
|
||||
mutations.push({
|
||||
t: now(),
|
||||
key: ctxKey,
|
||||
n: msgs.length,
|
||||
last: lastSum,
|
||||
delta: `replaceMessages(action=${params.action ?? '-'}) src=${stackTop ?? '-'}`,
|
||||
});
|
||||
|
||||
return origReplace.apply(this, args);
|
||||
};
|
||||
}
|
||||
} catch (e: any) {
|
||||
calls.push({ t: now(), name: '_WRAP_ERROR_', error: String(e?.message ?? e) });
|
||||
}
|
||||
|
||||
// ── 3.5. Mutation log — wrap the TWO ChatStore writers (replaceMessages,
|
||||
// internal_dispatchMessage) to record EVERY dbMessagesMap[key] reference
|
||||
// change with a one-line "before/after last assistant message" delta. This
|
||||
// reveals dispatchMessage-driven collapses that the replaceMessages wrap
|
||||
// alone cannot see.
|
||||
|
||||
declare global {
|
||||
interface Window {
|
||||
__PROBE_ORIG_DISPATCH_MESSAGE?: any;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const chat = w.__LOBE_STORES?.chat?.();
|
||||
if (chat?.internal_dispatchMessage) {
|
||||
if (!w.__PROBE_ORIG_DISPATCH_MESSAGE)
|
||||
w.__PROBE_ORIG_DISPATCH_MESSAGE = chat.internal_dispatchMessage;
|
||||
const origDispatch = w.__PROBE_ORIG_DISPATCH_MESSAGE;
|
||||
chat.internal_dispatchMessage = origDispatch;
|
||||
|
||||
chat.internal_dispatchMessage = function probeDispatch(this: unknown, payload: any, ctx?: any) {
|
||||
// Snapshot BEFORE — read the would-be target key + last message.
|
||||
const before = (() => {
|
||||
try {
|
||||
const state = w.__LOBE_STORES?.chat?.();
|
||||
if (!state) return null;
|
||||
// Replicate state.internal_getConversationContext logic enough to
|
||||
// resolve a key — but most callers pass operationId on ctx, and
|
||||
// operationId-keyed lookup needs store internals. Easiest: snapshot
|
||||
// ALL keys' last-assistant cLen and compare BEFORE vs AFTER below.
|
||||
const map = state.dbMessagesMap ?? {};
|
||||
const out: Record<string, any> = {};
|
||||
for (const k of Object.keys(map)) {
|
||||
const last = (map[k] ?? []).at(-1);
|
||||
out[k] = last
|
||||
? {
|
||||
id: (last.id ?? '').slice(-8),
|
||||
cLen: (last.content ?? '').length,
|
||||
rLen: (last.reasoning?.content ?? '').length,
|
||||
n: map[k].length,
|
||||
}
|
||||
: { n: 0 };
|
||||
}
|
||||
return out;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
})();
|
||||
|
||||
const result = origDispatch.apply(this, [payload, ctx]);
|
||||
|
||||
// Snapshot AFTER — find which key(s) actually changed.
|
||||
try {
|
||||
const state = w.__LOBE_STORES?.chat?.();
|
||||
if (state && before) {
|
||||
const map = state.dbMessagesMap ?? {};
|
||||
for (const k of Object.keys(map)) {
|
||||
const last = (map[k] ?? []).at(-1);
|
||||
const beforeSnap = before[k];
|
||||
const afterSnap = last
|
||||
? {
|
||||
id: (last.id ?? '').slice(-8),
|
||||
cLen: (last.content ?? '').length,
|
||||
rLen: (last.reasoning?.content ?? '').length,
|
||||
n: map[k].length,
|
||||
}
|
||||
: { n: 0 };
|
||||
const changed =
|
||||
!beforeSnap ||
|
||||
beforeSnap.n !== afterSnap.n ||
|
||||
beforeSnap.id !== (afterSnap as any).id ||
|
||||
beforeSnap.cLen !== (afterSnap as any).cLen ||
|
||||
beforeSnap.rLen !== (afterSnap as any).rLen;
|
||||
if (!changed) continue;
|
||||
let delta = '';
|
||||
if (beforeSnap?.id !== undefined && beforeSnap.id !== (afterSnap as any).id)
|
||||
delta += `id:${beforeSnap.id}→${(afterSnap as any).id};`;
|
||||
if (
|
||||
beforeSnap?.cLen !== undefined &&
|
||||
(afterSnap as any).cLen !== undefined &&
|
||||
(afterSnap as any).cLen < beforeSnap.cLen
|
||||
)
|
||||
delta += `cLen↓${beforeSnap.cLen}→${(afterSnap as any).cLen};`;
|
||||
if (
|
||||
beforeSnap?.rLen !== undefined &&
|
||||
(afterSnap as any).rLen !== undefined &&
|
||||
(afterSnap as any).rLen < beforeSnap.rLen
|
||||
)
|
||||
delta += `rLen↓${beforeSnap.rLen}→${(afterSnap as any).rLen};`;
|
||||
if (beforeSnap?.n !== undefined && afterSnap.n < beforeSnap.n)
|
||||
delta += `n↓${beforeSnap.n}→${afterSnap.n};`;
|
||||
mutations.push({
|
||||
t: now(),
|
||||
key: k,
|
||||
n: afterSnap.n,
|
||||
last: (afterSnap as any).id ? (afterSnap as any) : undefined,
|
||||
prevLast: beforeSnap?.id ? beforeSnap : undefined,
|
||||
delta: delta || `dispatch:${payload?.type}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (e: any) {
|
||||
mutations.push({
|
||||
t: now(),
|
||||
key: '_DISPATCH_PROBE_ERROR_',
|
||||
n: -1,
|
||||
delta: String(e?.message ?? e),
|
||||
});
|
||||
}
|
||||
return result;
|
||||
};
|
||||
}
|
||||
} catch (e: any) {
|
||||
calls.push({ t: now(), name: '_DISPATCH_WRAP_ERROR_', error: String(e?.message ?? e) });
|
||||
}
|
||||
|
||||
// ── 4. Periodic per-key timeline snapshots ─────────────────────────
|
||||
|
||||
function captureTimeline(): void {
|
||||
try {
|
||||
const c = w.__LOBE_STORES?.chat?.();
|
||||
if (!c) return;
|
||||
const msgsMap = (c.messagesMap ?? {}) as Record<string, any[]>;
|
||||
const dbMap = (c.dbMessagesMap ?? {}) as Record<string, any[]>;
|
||||
const byKey: ProbeTimelineSample['byKey'] = {};
|
||||
for (const k of Object.keys(msgsMap)) {
|
||||
const display = msgsMap[k] ?? [];
|
||||
const db = dbMap[k] ?? [];
|
||||
if (display.length === 0 && db.length === 0) continue;
|
||||
byKey[k] = {
|
||||
n: display.length,
|
||||
dbN: db.length,
|
||||
msgs: summarizeMessages(display),
|
||||
};
|
||||
}
|
||||
const ops = Object.values((c.operations ?? {}) as Record<string, any>);
|
||||
timeline.push({
|
||||
t: now(),
|
||||
activeTopic: ((c.activeTopicId as string | null) ?? '').slice(-10) || null,
|
||||
keys: Object.keys(byKey),
|
||||
byKey,
|
||||
runOps: ops.filter((o: any) => o.status === 'running').length,
|
||||
});
|
||||
} catch (e: any) {
|
||||
timeline.push({
|
||||
t: now(),
|
||||
activeTopic: null,
|
||||
keys: [],
|
||||
byKey: {},
|
||||
runOps: 0,
|
||||
err: e?.message ?? String(e),
|
||||
});
|
||||
}
|
||||
}
|
||||
captureTimeline();
|
||||
if (w.__PROBE_TIMELINE_TIMER) clearInterval(w.__PROBE_TIMELINE_TIMER);
|
||||
w.__PROBE_TIMELINE_TIMER = setInterval(captureTimeline, 200);
|
||||
|
||||
// ── 5. Tab-switch helpers ──────────────────────────────────────────
|
||||
|
||||
function listTopBarTabs(): HTMLElement[] {
|
||||
return Array.from(
|
||||
document.querySelectorAll<HTMLElement>(
|
||||
'[data-insp-path*="TabItem.tsx"][data-contextmenu-trigger]',
|
||||
),
|
||||
).filter((t) => t.getBoundingClientRect().top < 30);
|
||||
}
|
||||
|
||||
w.__listTabs = () =>
|
||||
listTopBarTabs().map((t, i) => ({
|
||||
i,
|
||||
key: t.getAttribute('data-contextmenu-trigger'),
|
||||
active: t.getAttribute('data-active') === 'true',
|
||||
title: (t.innerText ?? '').slice(0, 60),
|
||||
}));
|
||||
|
||||
w.__clickTabByKey = (key: string) => {
|
||||
const tab = listTopBarTabs().find((t) => t.getAttribute('data-contextmenu-trigger') === key);
|
||||
if (!tab) return 'not found: ' + key;
|
||||
if (tab.getAttribute('data-active') === 'true') return 'already active: ' + key;
|
||||
tab.click();
|
||||
return 'clicked key=' + key;
|
||||
};
|
||||
|
||||
w.__PROBE_EVENT = (name: string) => {
|
||||
calls.push({ t: now(), name: 'MARK:' + name });
|
||||
};
|
||||
|
||||
// `run.ts` wraps the bundle in an IIFE and appends a `return <confirmation>`
|
||||
// after the bundle body — agent-browser then prints the confirmation back to
|
||||
// the operator. Nothing to do here at the end of the module body.
|
||||
@@ -0,0 +1,204 @@
|
||||
// LobeHub chat streaming time-series probe.
|
||||
//
|
||||
// Inject into the renderer (via agent-browser eval) to record store + DOM
|
||||
// snapshots every 200ms during a streaming session. Designed to surface
|
||||
// "UI rolled back to an earlier state" symptoms — especially around
|
||||
// gateway-mode tab switches that happen while the assistant is still writing.
|
||||
//
|
||||
// Usage:
|
||||
// agent-browser --cdp 9222 eval --stdin < probe.js
|
||||
// # ...do test interactions, call window.__PROBE_EVENT('LABEL') to mark moments...
|
||||
// agent-browser --cdp 9222 eval --stdin < probe-dump.js > /tmp/probe.json
|
||||
// node analyze.mjs /tmp/probe.json
|
||||
//
|
||||
// What it captures per sample:
|
||||
// - activeTopicId
|
||||
// - msgN: top-level messages in chat.messagesMap for this topic
|
||||
// - childN: total assistantGroup.children blocks across all msgs (THIS is
|
||||
// where streaming content actually lives — top-level assistantGroup stays empty)
|
||||
// - cT / rT / toolT: totals across messages AND their children
|
||||
// (content, reasoning, tool-call count)
|
||||
// - perMsg: per-message breakdown so regressions can be located precisely
|
||||
// - runOps: number of running operations (execServerAgentRuntime etc.)
|
||||
// - domLen: total innerText length of the rendered chat list area
|
||||
// - ind: visible UI indicators (Search pages, Crawled pages, Deeply Thought, Sending)
|
||||
//
|
||||
// Event markers: window.__PROBE_EVENT('NAME') records {t, name} into
|
||||
// __PROBE_EVENTS, used by the analyzer to align state changes with
|
||||
// user-driven actions (SENT, AWAY_1, BACK_1, ...).
|
||||
|
||||
(function () {
|
||||
if (window.__PROBE_TIMER) clearInterval(window.__PROBE_TIMER);
|
||||
window.__PROBE_SAMPLES = [];
|
||||
window.__PROBE_EVENTS = [];
|
||||
const t0 = Date.now();
|
||||
|
||||
function snapshot() {
|
||||
try {
|
||||
const chat = window.__LOBE_STORES.chat();
|
||||
const topicId = chat.activeTopicId;
|
||||
const idTail = topicId ? topicId.replace('tpc_', '') : null;
|
||||
const keys = Object.keys(chat.messagesMap || {});
|
||||
|
||||
// Collect messages for the active topic. Before a topic is committed,
|
||||
// optimistic messages live under the `<agentScope>_new` key — fall
|
||||
// back to those when no topic is active yet.
|
||||
let msgs = [];
|
||||
if (idTail) {
|
||||
keys.forEach((k) => {
|
||||
if (k.includes(idTail)) msgs = msgs.concat(chat.messagesMap[k] || []);
|
||||
});
|
||||
} else {
|
||||
keys
|
||||
.filter((k) => k.endsWith('_new'))
|
||||
.forEach((k) => {
|
||||
msgs = msgs.concat(chat.messagesMap[k] || []);
|
||||
});
|
||||
}
|
||||
|
||||
// Walk top-level + assistantGroup.children. children carry the actual
|
||||
// streamed content / reasoning / tool calls; the parent assistantGroup
|
||||
// remains a placeholder (cLen=0, rLen=0) for its whole lifetime.
|
||||
let totalContent = 0;
|
||||
let totalReason = 0;
|
||||
let totalTools = 0;
|
||||
let childCount = 0;
|
||||
const perMsg = msgs.map((m) => {
|
||||
const cLen = (m.content || '').length;
|
||||
const rLen = ((m.reasoning && m.reasoning.content) || '').length;
|
||||
const tools = (m.tools || []).length;
|
||||
totalContent += cLen;
|
||||
totalReason += rLen;
|
||||
totalTools += tools;
|
||||
|
||||
const children = m.children || [];
|
||||
let chC = 0;
|
||||
let chR = 0;
|
||||
let chT = 0;
|
||||
children.forEach((c) => {
|
||||
chC += (c.content || '').length;
|
||||
chR += ((c.reasoning && c.reasoning.content) || '').length;
|
||||
chT += (c.tools || []).length;
|
||||
});
|
||||
totalContent += chC;
|
||||
totalReason += chR;
|
||||
totalTools += chT;
|
||||
childCount += children.length;
|
||||
|
||||
return {
|
||||
id: (m.id || '').slice(-8),
|
||||
role: m.role,
|
||||
cLen,
|
||||
rLen,
|
||||
tools,
|
||||
chCount: children.length,
|
||||
chC,
|
||||
chR,
|
||||
chT,
|
||||
};
|
||||
});
|
||||
|
||||
const ops = Object.values(chat.operations || {});
|
||||
const runningOps = ops.filter((o) => o.status === 'running');
|
||||
|
||||
// DOM probe: total rendered text in the chat scroll area (proxy for
|
||||
// "how much is actually visible to the user").
|
||||
const convScroll =
|
||||
document.querySelector(
|
||||
'[data-chat-list], [class*="ChatList"], [class*="ConversationList"]',
|
||||
) ||
|
||||
document.querySelector('main [class*="scroll"]') ||
|
||||
document.querySelector('main');
|
||||
const domTxt = convScroll ? convScroll.innerText || '' : '';
|
||||
|
||||
const bodyTxt = document.body.innerText || '';
|
||||
const searchMatches = (bodyTxt.match(/Search pages?:|Searched the web/g) || []).length;
|
||||
const crawlMatches = (bodyTxt.match(/Crawl(ed|ing) pages?/g) || []).length;
|
||||
|
||||
window.__PROBE_SAMPLES.push({
|
||||
t: Date.now() - t0,
|
||||
topicId,
|
||||
msgN: msgs.length,
|
||||
childN: childCount,
|
||||
cT: totalContent,
|
||||
rT: totalReason,
|
||||
toolT: totalTools,
|
||||
perMsg,
|
||||
runOps: runningOps.length,
|
||||
runOpTypes: runningOps.map((o) => o.type),
|
||||
domLen: domTxt.length,
|
||||
ind: {
|
||||
search: searchMatches,
|
||||
crawl: crawlMatches,
|
||||
sending: bodyTxt.includes('Sending message'),
|
||||
deeplyThinking: bodyTxt.includes('Deeply Thinking'),
|
||||
deeplyThought: bodyTxt.includes('Deeply Thought'),
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
window.__PROBE_SAMPLES.push({ t: Date.now() - t0, err: e.message });
|
||||
}
|
||||
}
|
||||
|
||||
snapshot();
|
||||
window.__PROBE_TIMER = setInterval(snapshot, 200);
|
||||
window.__PROBE_EVENT = function (name) {
|
||||
window.__PROBE_EVENTS.push({ t: Date.now() - t0, name });
|
||||
};
|
||||
|
||||
// Tab-switch helpers installed alongside the probe.
|
||||
//
|
||||
// The Electron tab bar mounts each tab as a div with data-insp-path
|
||||
// ending in `TabItem.tsx:...`. The active tab is marked with
|
||||
// data-active="true". DO NOT search by innerText — the active tab's text
|
||||
// includes a ` · <agent name>` suffix that produces false matches when
|
||||
// your search string happens to overlap with the agent name.
|
||||
function listTabs() {
|
||||
return Array.from(
|
||||
document.querySelectorAll('[data-insp-path*="TabItem.tsx"][data-contextmenu-trigger]'),
|
||||
).filter((t) => t.getBoundingClientRect().top < 30);
|
||||
}
|
||||
function tabKey(el) {
|
||||
// Stable for the tab's lifetime; survives focus changes.
|
||||
return el.getAttribute('data-contextmenu-trigger');
|
||||
}
|
||||
function findActiveTab() {
|
||||
return listTabs().find((t) => t.getAttribute('data-active') === 'true') || null;
|
||||
}
|
||||
|
||||
// Click by stable key captured earlier (preferred for round-trips).
|
||||
window.__clickTabByKey = function (key) {
|
||||
const tab = listTabs().find((t) => tabKey(t) === key);
|
||||
if (!tab) return 'not found: key=' + key;
|
||||
if (tab.getAttribute('data-active') === 'true') return 'already active: ' + key;
|
||||
tab.click();
|
||||
return 'clicked key=' + key;
|
||||
};
|
||||
|
||||
// Click by index in the tab strip (0-based, left-to-right).
|
||||
window.__clickTabByIndex = function (i) {
|
||||
const tabs = listTabs();
|
||||
if (i < 0 || i >= tabs.length) return 'index out of range: ' + i + '/' + tabs.length;
|
||||
const t = tabs[i];
|
||||
if (t.getAttribute('data-active') === 'true') return 'already active: i=' + i;
|
||||
t.click();
|
||||
return 'clicked i=' + i + ' key=' + tabKey(t);
|
||||
};
|
||||
|
||||
// Snapshot all tabs in order: [{key, active, title (first 60 chars of innerText)}]
|
||||
window.__listTabs = function () {
|
||||
return listTabs().map((t, i) => ({
|
||||
i,
|
||||
key: tabKey(t),
|
||||
active: t.getAttribute('data-active') === 'true',
|
||||
title: (t.innerText || '').slice(0, 60),
|
||||
}));
|
||||
};
|
||||
|
||||
window.__activeTabKey = function () {
|
||||
const a = findActiveTab();
|
||||
return a ? tabKey(a) : null;
|
||||
};
|
||||
|
||||
return 'probe installed';
|
||||
})();
|
||||
@@ -0,0 +1,211 @@
|
||||
// CLI for the agent-gateway probe.
|
||||
//
|
||||
// Bundles the TS probes with esbuild, pipes them into `agent-browser eval`,
|
||||
// and persists dumps under `.agent-gateway/` (gitignored) for later use as
|
||||
// streaming-replay test fixtures.
|
||||
//
|
||||
// Commands:
|
||||
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts install
|
||||
// Bundle probe-events.ts and inject into the CDP-attached browser.
|
||||
// Re-installing clears all buffers and re-patches WebSocket / fetch.
|
||||
//
|
||||
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts dump [name]
|
||||
// Stop the timeline timer, fetch the capture as JSON, write it to
|
||||
// `.agent-gateway/<name>-<YYYYMMDD-HHmmss>.json`. `name` defaults to
|
||||
// `dump`. Prints the absolute path written.
|
||||
//
|
||||
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts analyze [path]
|
||||
// Run analyze-events.ts on the dump. `path` defaults to the most
|
||||
// recently modified file in `.agent-gateway/`.
|
||||
//
|
||||
// Optional flags:
|
||||
// --cdp <port> CDP port (default 9222)
|
||||
// --browser <bin> agent-browser binary (default 'agent-browser')
|
||||
|
||||
import { spawn } from 'node:child_process';
|
||||
import { mkdirSync, readdirSync, statSync, writeFileSync } from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
|
||||
const SCRIPT_DIR = path.dirname(fileURLToPath(import.meta.url));
|
||||
// .agents/skills/local-testing/scripts/agent-gateway/ → 5 levels up
|
||||
const PROJECT_ROOT = path.resolve(SCRIPT_DIR, '../../../../..');
|
||||
const DUMP_DIR = path.join(PROJECT_ROOT, '.agent-gateway');
|
||||
|
||||
interface Flags {
|
||||
browser: string;
|
||||
cdp: string;
|
||||
positional: string[];
|
||||
}
|
||||
|
||||
function parseFlags(argv: string[]): Flags {
|
||||
const out: Flags = { cdp: '9222', browser: 'agent-browser', positional: [] };
|
||||
for (let i = 0; i < argv.length; i++) {
|
||||
const a = argv[i];
|
||||
if (a === '--cdp') out.cdp = argv[++i] ?? out.cdp;
|
||||
else if (a === '--browser') out.browser = argv[++i] ?? out.browser;
|
||||
else out.positional.push(a);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
async function bundle(entry: string): Promise<string> {
|
||||
// Bun.build is built into the Bun runtime — no external dep needed.
|
||||
const r = await Bun.build({
|
||||
entrypoints: [path.join(SCRIPT_DIR, entry)],
|
||||
target: 'browser',
|
||||
format: 'esm',
|
||||
minify: false,
|
||||
});
|
||||
if (!r.success) {
|
||||
const msgs = r.logs.map((l) => `${l.level}: ${l.message}`).join('\n');
|
||||
throw new Error(`bundle failed for ${entry}:\n${msgs}`);
|
||||
}
|
||||
return await r.outputs[0].text();
|
||||
}
|
||||
|
||||
function wrapIife(body: string, returnExpr: string): string {
|
||||
// Wrap as an IIFE that swallows the bundled top-level (top-level `const`
|
||||
// declarations get scoped to the IIFE, so re-injection doesn't conflict)
|
||||
// and returns the configured expression — which `agent-browser eval`
|
||||
// captures and prints to stdout.
|
||||
return `(() => {\n${body}\n;return ${returnExpr};\n})()`;
|
||||
}
|
||||
|
||||
function runAgentBrowserEval(flags: Flags, script: string): Promise<string> {
|
||||
return new Promise((resolveP, rejectP) => {
|
||||
const child = spawn(flags.browser, ['--cdp', flags.cdp, 'eval', '--stdin'], {
|
||||
stdio: ['pipe', 'pipe', 'inherit'],
|
||||
});
|
||||
let stdout = '';
|
||||
child.stdout.on('data', (chunk: Buffer) => {
|
||||
stdout += chunk.toString('utf8');
|
||||
});
|
||||
child.on('error', rejectP);
|
||||
child.on('close', (code) => {
|
||||
if (code === 0) resolveP(stdout);
|
||||
else rejectP(new Error(`agent-browser exited ${code}`));
|
||||
});
|
||||
child.stdin.write(script);
|
||||
child.stdin.end();
|
||||
});
|
||||
}
|
||||
|
||||
// agent-browser prints eval results as JSON (string values are quoted).
|
||||
function unquoteAgentBrowserResult(raw: string): string {
|
||||
const trimmed = raw.trim();
|
||||
if (trimmed.startsWith('"') && trimmed.endsWith('"')) {
|
||||
try {
|
||||
return JSON.parse(trimmed) as string;
|
||||
} catch {
|
||||
/* fall through */
|
||||
}
|
||||
}
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
function isoStamp(): string {
|
||||
const d = new Date();
|
||||
const yyyy = d.getFullYear();
|
||||
const mm = String(d.getMonth() + 1).padStart(2, '0');
|
||||
const dd = String(d.getDate()).padStart(2, '0');
|
||||
const hh = String(d.getHours()).padStart(2, '0');
|
||||
const mi = String(d.getMinutes()).padStart(2, '0');
|
||||
const ss = String(d.getSeconds()).padStart(2, '0');
|
||||
return `${yyyy}${mm}${dd}-${hh}${mi}${ss}`;
|
||||
}
|
||||
|
||||
function ensureDumpDir(): void {
|
||||
mkdirSync(DUMP_DIR, { recursive: true });
|
||||
}
|
||||
|
||||
function latestDump(): string | null {
|
||||
ensureDumpDir();
|
||||
const entries = readdirSync(DUMP_DIR)
|
||||
.filter((f) => f.endsWith('.json'))
|
||||
.map((f) => ({ f, mtime: statSync(path.join(DUMP_DIR, f)).mtimeMs }))
|
||||
.sort((a, b) => b.mtime - a.mtime);
|
||||
return entries[0] ? path.join(DUMP_DIR, entries[0].f) : null;
|
||||
}
|
||||
|
||||
// ── Commands ────────────────────────────────────────────────────────
|
||||
|
||||
async function cmdInstall(flags: Flags): Promise<void> {
|
||||
const body = await bundle('probe-events.ts');
|
||||
const installMsg = JSON.stringify(
|
||||
'events probe installed: WebSocket+fetch interception. ' +
|
||||
'WS captures operationId= sockets (gateway), fetch captures /api/agent/stream (direct).',
|
||||
);
|
||||
const script = wrapIife(body, installMsg);
|
||||
const out = await runAgentBrowserEval(flags, script);
|
||||
console.log(unquoteAgentBrowserResult(out));
|
||||
}
|
||||
|
||||
async function cmdDump(flags: Flags): Promise<void> {
|
||||
const name = flags.positional[1] ?? 'dump';
|
||||
const body = await bundle('probe-dump.ts');
|
||||
const script = wrapIife(body, 'window.__PROBE_LAST_DUMP_JSON');
|
||||
const raw = await runAgentBrowserEval(flags, script);
|
||||
const json = unquoteAgentBrowserResult(raw);
|
||||
ensureDumpDir();
|
||||
const filename = `${name}-${isoStamp()}.json`;
|
||||
const dumpPath = path.join(DUMP_DIR, filename);
|
||||
writeFileSync(dumpPath, json, 'utf8');
|
||||
// Validate by parsing the meta header so we error early on bad capture
|
||||
try {
|
||||
const parsed = JSON.parse(json) as {
|
||||
meta?: { eventCount?: number; callCount?: number; sampleCount?: number };
|
||||
};
|
||||
const meta = parsed.meta ?? {};
|
||||
console.log(
|
||||
`wrote ${dumpPath} (${json.length} bytes events=${meta.eventCount ?? '?'} ` +
|
||||
`calls=${meta.callCount ?? '?'} samples=${meta.sampleCount ?? '?'})`,
|
||||
);
|
||||
} catch {
|
||||
console.log(`wrote ${dumpPath} (${json.length} bytes — JSON.parse failed; see file)`);
|
||||
}
|
||||
}
|
||||
|
||||
async function cmdAnalyze(flags: Flags): Promise<void> {
|
||||
const target = flags.positional[1] ?? latestDump();
|
||||
if (!target) {
|
||||
console.error('no dump file found. run `dump` first or pass a path.');
|
||||
process.exit(1);
|
||||
}
|
||||
const child = spawn('bun', ['run', path.join(SCRIPT_DIR, 'analyze-events.ts'), target], {
|
||||
stdio: 'inherit',
|
||||
});
|
||||
await new Promise<void>((resolveP, rejectP) => {
|
||||
child.on('error', rejectP);
|
||||
child.on('close', (code) => (code === 0 ? resolveP() : rejectP(new Error(`exit ${code}`))));
|
||||
});
|
||||
}
|
||||
|
||||
// ── Entry point ─────────────────────────────────────────────────────
|
||||
|
||||
const flags = parseFlags(process.argv.slice(2));
|
||||
const cmd = flags.positional[0];
|
||||
|
||||
const usage = `usage:
|
||||
bun run run.ts install [--cdp 9222]
|
||||
bun run run.ts dump [name] [--cdp 9222]
|
||||
bun run run.ts analyze [path]
|
||||
`;
|
||||
|
||||
if (!cmd) {
|
||||
console.error(usage);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
try {
|
||||
if (cmd === 'install') await cmdInstall(flags);
|
||||
else if (cmd === 'dump') await cmdDump(flags);
|
||||
else if (cmd === 'analyze') await cmdAnalyze(flags);
|
||||
else {
|
||||
console.error(`unknown command: ${cmd}\n\n${usage}`);
|
||||
process.exit(1);
|
||||
}
|
||||
} catch (e: any) {
|
||||
console.error(e?.stack ?? e);
|
||||
process.exit(1);
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
// Run N round-trip tab switches with event markers timed against the probe.
|
||||
//
|
||||
// agent-browser --cdp 9222 eval --stdin < tab-switch.js
|
||||
//
|
||||
// Captures the currently-active tab as the BACK target and the rightmost
|
||||
// inactive tab as the AWAY target. Both are addressed by their stable
|
||||
// data-contextmenu-trigger key (NOT by visible title — the active tab's
|
||||
// innerText embeds a ` · <agent name>` suffix that breaks text matching).
|
||||
//
|
||||
// Fires the loop in the background and returns immediately so the
|
||||
// agent-browser eval doesn't have to await the full ROUND_TRIPS × DWELL_MS
|
||||
// duration. Wait on the `SWITCH_LOOP_DONE` event before dumping.
|
||||
//
|
||||
// Refuses to launch if a previous loop is still in flight.
|
||||
//
|
||||
// Requires probe.js to have been installed first (provides
|
||||
// window.__PROBE_EVENT / __listTabs / __clickTabByKey / __activeTabKey).
|
||||
|
||||
(function () {
|
||||
const ROUND_TRIPS = 4;
|
||||
const DWELL_MS = 10_000;
|
||||
|
||||
if (!window.__PROBE_EVENT || !window.__listTabs || !window.__clickTabByKey) {
|
||||
return 'probe not installed — eval probe.js first';
|
||||
}
|
||||
if (window.__SWITCH_LOOP_RUNNING) {
|
||||
return 'switch loop already running — wait for SWITCH_LOOP_DONE first';
|
||||
}
|
||||
|
||||
const tabs = window.__listTabs();
|
||||
const activeTab = tabs.find((t) => t.active);
|
||||
if (!activeTab) return 'no active tab — abort';
|
||||
|
||||
// Pick the first inactive tab as AWAY target. With multiple inactive tabs
|
||||
// you'll usually want the one that's stable across the test — feel free
|
||||
// to swap to tabs[tabs.length-1] if you want the rightmost.
|
||||
const inactives = tabs.filter((t) => !t.active);
|
||||
if (inactives.length === 0) return 'no inactive tab to switch to — abort';
|
||||
const awayTab = inactives.at(-1); // rightmost inactive
|
||||
|
||||
const BACK_KEY = activeTab.key;
|
||||
const AWAY_KEY = awayTab.key;
|
||||
|
||||
window.__SWITCH_LOOP_RUNNING = true;
|
||||
window.__PROBE_EVENT('SWITCH_LOOP_CONFIG:back=' + BACK_KEY + ',away=' + AWAY_KEY);
|
||||
|
||||
(async function () {
|
||||
function sleep(ms) {
|
||||
return new Promise((r) => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
try {
|
||||
window.__PROBE_EVENT('SWITCH_LOOP_START');
|
||||
for (let i = 1; i <= ROUND_TRIPS; i++) {
|
||||
window.__PROBE_EVENT('AWAY_' + i);
|
||||
const awayResult = window.__clickTabByKey(AWAY_KEY);
|
||||
window.__PROBE_EVENT('AWAY_' + i + '_RES:' + awayResult.slice(0, 50));
|
||||
await sleep(DWELL_MS);
|
||||
|
||||
window.__PROBE_EVENT('BACK_' + i);
|
||||
const backResult = window.__clickTabByKey(BACK_KEY);
|
||||
window.__PROBE_EVENT('BACK_' + i + '_RES:' + backResult.slice(0, 50));
|
||||
await sleep(DWELL_MS);
|
||||
}
|
||||
window.__PROBE_EVENT('SWITCH_LOOP_DONE');
|
||||
} finally {
|
||||
window.__SWITCH_LOOP_RUNNING = false;
|
||||
}
|
||||
})();
|
||||
|
||||
return 'switch loop kicked off (BACK=' + BACK_KEY + ', AWAY=' + AWAY_KEY + ')';
|
||||
})();
|
||||
@@ -0,0 +1,113 @@
|
||||
// Shared types between the in-browser probe and the Node-side analyzer.
|
||||
// Kept tiny on purpose — anything the analyzer can re-derive is left off.
|
||||
|
||||
export interface ProbeStreamEvent {
|
||||
/** Summarized payload — long strings truncated, arrays printed as Array(N) */
|
||||
data?: Record<string, unknown>;
|
||||
/** Keys present on the event's `data` payload — useful at a glance */
|
||||
dataKeys?: string[];
|
||||
/** ServerMessage.id — gateway WS frames carry an event-id we may resume from */
|
||||
eventId?: string | null;
|
||||
message?: string;
|
||||
/** Last 10 chars of the operationId (full id is excessively long) */
|
||||
opIdTail: string;
|
||||
raw?: string;
|
||||
/** Raw frame byte length, when applicable */
|
||||
rawLen?: number;
|
||||
/** For non-agent_event server frames (auth_success, heartbeat_ack, …) */
|
||||
serverType?: string;
|
||||
sseEvent?: string;
|
||||
status?: number;
|
||||
stepIndex?: number;
|
||||
/** Milliseconds since the probe's t0 (install time). */
|
||||
t: number;
|
||||
/** 'ws' for gateway WebSocket frames, 'sse' for direct /api/agent/stream */
|
||||
transport: 'ws' | 'sse';
|
||||
/** Either the AgentStreamEvent.type, or a probe sentinel like `_WS_OPEN_` */
|
||||
type: string;
|
||||
url?: string;
|
||||
}
|
||||
|
||||
export interface ProbeActionCall {
|
||||
args?: {
|
||||
count?: number;
|
||||
context?: unknown;
|
||||
params?: unknown;
|
||||
};
|
||||
error?: string;
|
||||
/** `replaceMessages` / `refreshMessages` / `MARK:<label>` / `_WRAP_ERROR_` */
|
||||
name: string;
|
||||
stack?: string;
|
||||
t: number;
|
||||
}
|
||||
|
||||
export interface ProbeMessageSummary {
|
||||
/** children.length */
|
||||
chN: number;
|
||||
/** content.length */
|
||||
cLen: number;
|
||||
/** Last 8 chars of the message id */
|
||||
id: string;
|
||||
/** reasoning.content.length */
|
||||
rLen: number;
|
||||
role: string;
|
||||
/** tools.length */
|
||||
tools: number;
|
||||
}
|
||||
|
||||
export interface ProbeTimelineSample {
|
||||
/** Last 10 chars of activeTopicId, or null */
|
||||
activeTopic: string | null;
|
||||
/** Per-key breakdown: display count, db count, message summaries */
|
||||
byKey: Record<
|
||||
string,
|
||||
{
|
||||
n: number;
|
||||
dbN: number;
|
||||
msgs: ProbeMessageSummary[];
|
||||
}
|
||||
>;
|
||||
err?: string;
|
||||
/** All messagesMap keys that have content at this moment */
|
||||
keys: string[];
|
||||
/** Number of operations in 'running' status */
|
||||
runOps: number;
|
||||
t: number;
|
||||
}
|
||||
|
||||
export interface ProbeDumpMeta {
|
||||
callCount: number;
|
||||
/** Date.now() at dump call */
|
||||
collectedAt: number;
|
||||
eventCount: number;
|
||||
sampleCount: number;
|
||||
/** Date.now() at probe install */
|
||||
t0: number;
|
||||
}
|
||||
|
||||
export interface ProbeDump {
|
||||
actionCalls: ProbeActionCall[];
|
||||
meta: ProbeDumpMeta;
|
||||
streamEvents: ProbeStreamEvent[];
|
||||
timeline: ProbeTimelineSample[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Globals the probe attaches to `window`. Keeps `as any` casts at the boundary
|
||||
* instead of sprinkling them through the probe body.
|
||||
*/
|
||||
declare global {
|
||||
interface Window {
|
||||
__clickTabByKey?: (key: string) => string;
|
||||
__listTabs?: () => Array<{ i: number; key: string | null; active: boolean; title: string }>;
|
||||
__LOBE_STORES?: Record<string, () => any>;
|
||||
__PROBE_ACTION_CALLS?: ProbeActionCall[];
|
||||
__PROBE_EVENT?: (label: string) => void;
|
||||
__PROBE_MSG_TIMELINE?: ProbeTimelineSample[];
|
||||
__PROBE_ORIG_FETCH?: typeof fetch;
|
||||
__PROBE_ORIG_WEBSOCKET?: typeof WebSocket;
|
||||
__PROBE_STREAM_EVENTS?: ProbeStreamEvent[];
|
||||
__PROBE_T0?: number;
|
||||
__PROBE_TIMELINE_TIMER?: ReturnType<typeof setInterval> | null;
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,9 @@ prd
|
||||
# Recordings
|
||||
.records/
|
||||
|
||||
# Agent-gateway probe captures (local debugging dumps)
|
||||
.agent-gateway/
|
||||
|
||||
# Temporary files
|
||||
.temp/
|
||||
temp/
|
||||
|
||||
@@ -172,36 +172,25 @@ export default class GatewayConnectionCtr extends ControllerModule {
|
||||
request: AgentRunRequestMessage,
|
||||
): Promise<{ reason?: string; status: 'accepted' | 'rejected' }> {
|
||||
try {
|
||||
const ctr = this.heterogeneousAgentCtr;
|
||||
const serverUrl = await this.remoteServerConfigCtr.getRemoteServerUrl();
|
||||
if (!serverUrl) {
|
||||
return { reason: 'Remote server URL not configured', status: 'rejected' };
|
||||
}
|
||||
|
||||
// Map agentType to binary name.
|
||||
// claude-code → `claude` CLI; all other platforms use their type name as the binary.
|
||||
const command = request.agentType === 'claude-code' ? 'claude' : request.agentType;
|
||||
|
||||
// Create a session for the hetero agent.
|
||||
const { sessionId } = await ctr.startSession({
|
||||
// Fire-and-forget: lh hetero exec handles spawn -> adapt ->
|
||||
// BatchIngester -> heteroIngest/heteroFinish -> server -> Gateway -> clients.
|
||||
// Same command as spawnHeteroSandbox() on the server side.
|
||||
this.heterogeneousAgentCtr.spawnLhHeteroExec({
|
||||
agentType: request.agentType,
|
||||
args: [],
|
||||
command,
|
||||
cwd: request.cwd,
|
||||
// Inject LOBEHUB_JWT so the CLI authenticates against heteroIngest.
|
||||
env: { LOBEHUB_JWT: request.jwt },
|
||||
jwt: request.jwt,
|
||||
operationId: request.operationId,
|
||||
prompt: request.prompt,
|
||||
resumeSessionId: request.resumeSessionId,
|
||||
serverUrl,
|
||||
topicId: request.topicId,
|
||||
});
|
||||
|
||||
// Fire-and-forget: sendPrompt runs the CLI until completion.
|
||||
ctr
|
||||
.sendPrompt({
|
||||
operationId: request.operationId,
|
||||
prompt: request.prompt,
|
||||
sessionId,
|
||||
})
|
||||
.catch((err: Error) => {
|
||||
// Errors are surfaced via heteroFinish on the server side.
|
||||
// Log locally for desktop debugging only.
|
||||
console.error('[GatewayConnectionCtr] agent run failed:', err.message);
|
||||
});
|
||||
|
||||
return { status: 'accepted' };
|
||||
} catch (err) {
|
||||
const reason = err instanceof Error ? err.message : String(err);
|
||||
|
||||
@@ -1251,4 +1251,69 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
process.on('SIGTERM', onSignal);
|
||||
process.on('SIGINT', onSignal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn `lh hetero exec` for gateway-driven agent runs.
|
||||
* The `lh` CLI handles everything downstream — no local
|
||||
* AgentStreamPipeline or IPC broadcast needed. Mirrors
|
||||
* `spawnHeteroSandbox()` on the server side.
|
||||
*/
|
||||
spawnLhHeteroExec(params: {
|
||||
agentType: string;
|
||||
cwd?: string;
|
||||
jwt: string;
|
||||
operationId: string;
|
||||
prompt: string;
|
||||
resumeSessionId?: string;
|
||||
serverUrl: string;
|
||||
topicId: string;
|
||||
}): void {
|
||||
const { agentType, cwd, jwt, operationId, prompt, resumeSessionId, serverUrl, topicId } =
|
||||
params;
|
||||
const workDir = cwd ?? process.cwd();
|
||||
|
||||
const args = [
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
agentType,
|
||||
'--operation-id',
|
||||
operationId,
|
||||
'--topic',
|
||||
topicId,
|
||||
'--render',
|
||||
'none',
|
||||
'--input-json',
|
||||
'-',
|
||||
'--cwd',
|
||||
workDir,
|
||||
...(resumeSessionId ? ['--resume', resumeSessionId] : []),
|
||||
];
|
||||
|
||||
const env = {
|
||||
...process.env,
|
||||
...buildProxyEnv(this.app.storeManager.get('networkProxy')),
|
||||
LOBEHUB_JWT: jwt,
|
||||
LOBEHUB_SERVER: serverUrl,
|
||||
};
|
||||
|
||||
logger.info('spawnLhHeteroExec: type=%s op=%s topic=%s', agentType, operationId, topicId);
|
||||
|
||||
const child = spawn('lh', args, {
|
||||
cwd: workDir,
|
||||
env,
|
||||
stdio: ['pipe', 'inherit', 'inherit'],
|
||||
});
|
||||
|
||||
child.stdin.write(JSON.stringify(prompt));
|
||||
child.stdin.end();
|
||||
|
||||
child.on('error', (err) => {
|
||||
logger.error('spawnLhHeteroExec: spawn failed — %s', err.message);
|
||||
});
|
||||
|
||||
child.on('exit', (code, signal) => {
|
||||
logger.info('spawnLhHeteroExec: exited — op=%s code=%s signal=%s', operationId, code, signal);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,11 +200,13 @@ const mockShellCommandCtr = {
|
||||
|
||||
const mockHeterogeneousAgentCtr = {
|
||||
sendPrompt: vi.fn().mockResolvedValue(undefined),
|
||||
spawnLhHeteroExec: vi.fn(),
|
||||
startSession: vi.fn().mockResolvedValue({ sessionId: 'mock-session-id' }),
|
||||
} as unknown as HeterogeneousAgentCtr;
|
||||
|
||||
const mockRemoteServerConfigCtr = {
|
||||
getAccessToken: vi.fn().mockResolvedValue('mock-access-token'),
|
||||
getRemoteServerUrl: vi.fn().mockResolvedValue('https://server.example.com'),
|
||||
isRemoteServerConfigured: vi.fn().mockResolvedValue(true),
|
||||
refreshAccessToken: vi.fn().mockResolvedValue({ success: true }),
|
||||
} as unknown as RemoteServerConfigCtr;
|
||||
@@ -631,26 +633,23 @@ describe('GatewayConnectionCtr', () => {
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.mocked(mockHeterogeneousAgentCtr.startSession).mockClear();
|
||||
vi.mocked(mockHeterogeneousAgentCtr.sendPrompt).mockClear();
|
||||
vi.mocked(mockHeterogeneousAgentCtr.spawnLhHeteroExec).mockClear();
|
||||
});
|
||||
|
||||
it.each([
|
||||
['openclaw', 'openclaw'],
|
||||
['hermes', 'hermes'],
|
||||
['codex', 'codex'],
|
||||
['claude-code', 'claude'],
|
||||
] as const)('uses command "%s" for agentType "%s"', async (agentType, expectedCommand) => {
|
||||
const client = await connectAndOpen();
|
||||
client.simulateAgentRunRequest(agentType);
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
it.each(['openclaw', 'hermes', 'codex', 'claude-code'] as const)(
|
||||
'forwards agentType "%s" to spawnLhHeteroExec',
|
||||
async (agentType) => {
|
||||
const client = await connectAndOpen();
|
||||
client.simulateAgentRunRequest(agentType);
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
expect(mockHeterogeneousAgentCtr.startSession).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ agentType, command: expectedCommand }),
|
||||
);
|
||||
});
|
||||
expect(mockHeterogeneousAgentCtr.spawnLhHeteroExec).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ agentType }),
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it('sends accepted ack and fires sendPrompt', async () => {
|
||||
it('sends accepted ack and spawns lh hetero exec', async () => {
|
||||
const client = await connectAndOpen();
|
||||
client.simulateAgentRunRequest('openclaw', 'op-xyz');
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
@@ -659,15 +658,37 @@ describe('GatewayConnectionCtr', () => {
|
||||
operationId: 'op-xyz',
|
||||
status: 'accepted',
|
||||
});
|
||||
expect(mockHeterogeneousAgentCtr.sendPrompt).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ operationId: 'op-xyz', sessionId: 'mock-session-id' }),
|
||||
expect(mockHeterogeneousAgentCtr.spawnLhHeteroExec).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agentType: 'openclaw',
|
||||
jwt: 'mock-jwt',
|
||||
operationId: 'op-xyz',
|
||||
prompt: 'hello',
|
||||
serverUrl: 'https://server.example.com',
|
||||
topicId: 'topic-1',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('sends rejected ack when startSession throws', async () => {
|
||||
vi.mocked(mockHeterogeneousAgentCtr.startSession).mockRejectedValueOnce(
|
||||
new Error('binary not found'),
|
||||
);
|
||||
it('sends rejected ack when remote server URL is not configured', async () => {
|
||||
vi.mocked(mockRemoteServerConfigCtr.getRemoteServerUrl).mockResolvedValueOnce('');
|
||||
|
||||
const client = await connectAndOpen();
|
||||
client.simulateAgentRunRequest('openclaw', 'op-fail');
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
expect(client.sendAgentRunAck).toHaveBeenCalledWith({
|
||||
operationId: 'op-fail',
|
||||
reason: 'Remote server URL not configured',
|
||||
status: 'rejected',
|
||||
});
|
||||
expect(mockHeterogeneousAgentCtr.spawnLhHeteroExec).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('sends rejected ack when spawnLhHeteroExec throws', async () => {
|
||||
vi.mocked(mockHeterogeneousAgentCtr.spawnLhHeteroExec).mockImplementationOnce(() => {
|
||||
throw new Error('binary not found');
|
||||
});
|
||||
|
||||
const client = await connectAndOpen();
|
||||
client.simulateAgentRunRequest('openclaw', 'op-fail');
|
||||
|
||||
@@ -34,6 +34,9 @@ export const DEFAULT_AGENT_CHAT_CONFIG: LobeAgentChatConfig = {
|
||||
reasoningBudgetToken: 1024,
|
||||
searchFCModel: DEFAULT_AGENT_SEARCH_FC_MODEL,
|
||||
searchMode: 'auto',
|
||||
selfIteration: {
|
||||
enabled: false,
|
||||
},
|
||||
};
|
||||
|
||||
export const DEFAULT_AGENT_CONFIG: LobeAgentConfig = {
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { DEFAULT_AGENT_CONFIG } from '@/const/settings';
|
||||
|
||||
import { type Store } from './action';
|
||||
import { selectors } from './selectors';
|
||||
|
||||
describe('AgentSetting selectors', () => {
|
||||
describe('currentChatConfig', () => {
|
||||
it('should include disabled self iteration by default', () => {
|
||||
const state = {
|
||||
config: DEFAULT_AGENT_CONFIG,
|
||||
} as Store;
|
||||
|
||||
expect(selectors.currentChatConfig(state).selfIteration).toEqual({ enabled: false });
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -7,6 +7,7 @@ import { useFetchAgentDocuments } from '@/hooks/useFetchAgentDocuments';
|
||||
import { useFetchTopicMemories } from '@/hooks/useFetchMemoryForTopic';
|
||||
import { useFetchNotebookDocuments } from '@/hooks/useFetchNotebookDocuments';
|
||||
import { useChatStore } from '@/store/chat';
|
||||
import { operationSelectors } from '@/store/chat/selectors';
|
||||
import { featureFlagsSelectors, useServerConfigStore } from '@/store/serverConfig';
|
||||
import { useUserStore } from '@/store/user';
|
||||
import { settingsSelectors } from '@/store/user/selectors';
|
||||
@@ -84,8 +85,14 @@ const ChatList = memo<ChatListProps>(
|
||||
s.useFetchMessages,
|
||||
]);
|
||||
const activeAgentId = useChatStore((s) => s.activeAgentId);
|
||||
// Suppress SWR focus revalidate while the current topic is streaming —
|
||||
// the server-pushed UIChatMessage[] snapshot at step boundaries is the
|
||||
// source of truth during that window. A focus refetch could hit DB
|
||||
// mid-fan-out and clobber the in-memory streamed state with a stale
|
||||
// assistant placeholder.
|
||||
const isStreaming = useChatStore(operationSelectors.isAgentRuntimeRunningByContext(context));
|
||||
const { enableAgentSelfIteration } = useServerConfigStore(featureFlagsSelectors);
|
||||
useFetchMessages(context, skipFetch);
|
||||
useFetchMessages(context, { revalidateOnFocus: !isStreaming, skipFetch });
|
||||
const displayMessages = useConversationStore(dataSelectors.displayMessages);
|
||||
const displayMessageIds = useConversationStore(dataSelectors.displayMessageIds);
|
||||
const latestMessageId = displayMessageIds.at(-1);
|
||||
|
||||
+19
-1
@@ -9,7 +9,9 @@ import ContentBlocksScroll from './ContentBlocksScroll';
|
||||
import type { RenderableAssistantContentBlock } from './types';
|
||||
|
||||
vi.mock('@lobehub/ui', () => ({
|
||||
Flexbox: ({ children }: { children?: ReactNode }) => <div>{children}</div>,
|
||||
Flexbox: ({ children, gap }: { children?: ReactNode; gap?: number }) => (
|
||||
<div data-gap={gap}>{children}</div>
|
||||
),
|
||||
ScrollArea: ({ children }: { children?: ReactNode }) => <div>{children}</div>,
|
||||
}));
|
||||
|
||||
@@ -62,4 +64,20 @@ describe('ContentBlocksScroll', () => {
|
||||
'true',
|
||||
);
|
||||
});
|
||||
|
||||
it('uses a consistent gap between workflow blocks', () => {
|
||||
const { container } = render(
|
||||
<ContentBlocksScroll
|
||||
assistantId="assistant-1"
|
||||
blocks={[
|
||||
{ content: 'first workflow block', id: 'block-1' },
|
||||
{ content: 'second workflow block', id: 'block-2' },
|
||||
]}
|
||||
scroll={false}
|
||||
variant="workflow"
|
||||
/>,
|
||||
);
|
||||
|
||||
expect(container.querySelector('[data-gap="8"]')).toBeInTheDocument();
|
||||
});
|
||||
});
|
||||
|
||||
+1
-1
@@ -68,7 +68,7 @@ const ContentBlocksScroll = memo<ContentBlocksScrollProps>((props) => {
|
||||
}, [assistantIdFromProps, blocksFromProps, messagesList]);
|
||||
|
||||
const list = (
|
||||
<Flexbox>
|
||||
<Flexbox gap={variant === 'workflow' ? 8 : undefined}>
|
||||
{blocks.map((block) => (
|
||||
<ContentBlock
|
||||
key={block.renderKey ?? block.id}
|
||||
|
||||
@@ -76,7 +76,7 @@ const ClientTaskItem = memo<ClientTaskItemProps>(({ item }) => {
|
||||
);
|
||||
|
||||
// Fetch thread messages (skip when executing - messages come from real-time updates)
|
||||
useFetchMessages(threadContext, isProcessing);
|
||||
useFetchMessages(threadContext, { skipFetch: isProcessing });
|
||||
|
||||
// Get thread messages from store using selector
|
||||
const threadMessages = useChatStore((s) =>
|
||||
|
||||
@@ -59,7 +59,7 @@ export const useClientTaskStats = ({
|
||||
);
|
||||
|
||||
// Fetch thread messages (skip when disabled or no threadId)
|
||||
useFetchMessages(threadContext, !enabled || !threadId);
|
||||
useFetchMessages(threadContext, { skipFetch: !enabled || !threadId });
|
||||
|
||||
// Get thread messages from store using selector
|
||||
const threadMessages = useChatStore((s) =>
|
||||
|
||||
@@ -54,7 +54,7 @@ const ClientTaskDetail = memo<ClientTaskDetailProps>(
|
||||
);
|
||||
|
||||
// Fetch thread messages (skip when executing - messages come from real-time updates)
|
||||
useFetchMessages(threadContext, isExecuting);
|
||||
useFetchMessages(threadContext, { skipFetch: isExecuting });
|
||||
|
||||
// Get thread messages from store using selector
|
||||
const threadMessages = useChatStore((s) =>
|
||||
|
||||
@@ -61,7 +61,7 @@ const ClientTaskItem = memo<ClientTaskItemProps>(({ item }) => {
|
||||
);
|
||||
|
||||
// Fetch thread messages (skip when executing - messages come from real-time updates)
|
||||
useFetchMessages(threadContext, isProcessing);
|
||||
useFetchMessages(threadContext, { skipFetch: isProcessing });
|
||||
|
||||
// Get thread messages from store using selector
|
||||
const threadMessages = useChatStore((s) =>
|
||||
|
||||
@@ -59,7 +59,7 @@ export const useClientTaskStats = ({
|
||||
);
|
||||
|
||||
// Fetch thread messages (skip when disabled or no threadId)
|
||||
useFetchMessages(threadContext, !enabled || !threadId);
|
||||
useFetchMessages(threadContext, { skipFetch: !enabled || !threadId });
|
||||
|
||||
// Get thread messages from store using selector
|
||||
const threadMessages = useChatStore((s) =>
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
import { HETEROGENEOUS_TYPE_LABELS } from '@lobechat/heterogeneous-agents';
|
||||
import {
|
||||
HETEROGENEOUS_TYPE_LABELS,
|
||||
isRemoteHeterogeneousType,
|
||||
} from '@lobechat/heterogeneous-agents';
|
||||
import { type ModelPerformance, type ModelUsage } from '@lobechat/types';
|
||||
import { ModelIcon } from '@lobehub/icons';
|
||||
import { Center, Flexbox } from '@lobehub/ui';
|
||||
@@ -33,7 +36,14 @@ const Usage = memo<UsageProps>(({ model, usage, performance, provider }) => {
|
||||
|
||||
if (!isDev && onboardingAgentId && conversationAgentId === onboardingAgentId) return null;
|
||||
|
||||
const heteroName = provider ? HETEROGENEOUS_TYPE_LABELS[provider] : undefined;
|
||||
// Only remote platform agents (openclaw, hermes) replace the model name with
|
||||
// the brand label — they don't expose a real model id. Local CLI agents
|
||||
// (claude-code, codex) report their actual model on `turn_metadata` and
|
||||
// should keep showing it.
|
||||
const heteroName =
|
||||
provider && isRemoteHeterogeneousType(provider)
|
||||
? HETEROGENEOUS_TYPE_LABELS[provider]
|
||||
: undefined;
|
||||
|
||||
return (
|
||||
<Flexbox
|
||||
|
||||
@@ -6,6 +6,8 @@ import { type StateCreator } from 'zustand/vanilla';
|
||||
|
||||
import { useClientDataSWRWithSync } from '@/libs/swr';
|
||||
import { messageService } from '@/services/message';
|
||||
import { getChatStoreState } from '@/store/chat';
|
||||
import { operationSelectors } from '@/store/chat/selectors';
|
||||
import { messageMapKey } from '@/store/chat/utils/messageMapKey';
|
||||
|
||||
import { type Store as ConversationStore } from '../../action';
|
||||
@@ -67,14 +69,17 @@ export interface DataAction {
|
||||
switchMessageBranch: (messageId: string, branchIndex: number) => Promise<void>;
|
||||
|
||||
/**
|
||||
* Fetch messages for this conversation using SWR
|
||||
* Fetch messages for this conversation using SWR.
|
||||
*
|
||||
* @param context - Conversation context with sessionId and topicId
|
||||
* @param skipFetch - When true, SWR key is null and no fetch occurs
|
||||
* @param options.skipFetch - When true, SWR key is null and no fetch occurs
|
||||
* @param options.revalidateOnFocus - Override SWR's default focus revalidate.
|
||||
* Pass `false` while a streaming flow owns the in-memory message state so
|
||||
* a focus refetch doesn't clobber it with a stale DB snapshot.
|
||||
*/
|
||||
useFetchMessages: (
|
||||
context: ConversationContext,
|
||||
skipFetch?: boolean,
|
||||
options?: { revalidateOnFocus?: boolean; skipFetch?: boolean },
|
||||
) => SWRResponse<UIChatMessage[]>;
|
||||
}
|
||||
|
||||
@@ -184,7 +189,8 @@ export const dataSlice: StateCreator<
|
||||
await state.updateMessageMetadata(message.parentId, { activeBranchIndex: branchIndex });
|
||||
},
|
||||
|
||||
useFetchMessages: (context, skipFetch) => {
|
||||
useFetchMessages: (context, options) => {
|
||||
const { skipFetch, revalidateOnFocus } = options ?? {};
|
||||
// When skipFetch is true, SWR key is null - no fetch occurs
|
||||
// This is used when external messages are provided (e.g., creating new thread)
|
||||
// Also skip fetch when topicId is null (new conversation state) - there's no server data,
|
||||
@@ -206,10 +212,27 @@ export const dataSlice: StateCreator<
|
||||
|
||||
() => messageService.getMessages(context),
|
||||
{
|
||||
...(revalidateOnFocus !== undefined && { revalidateOnFocus }),
|
||||
onData: (data) => {
|
||||
if (!data) return;
|
||||
if (!context.topicId) return;
|
||||
|
||||
// Defense-in-depth gate (LOBE-9501): drop any SWR onData while the
|
||||
// topic is streaming. DB fan-out for chunk writes is async and lags
|
||||
// the WS push by anywhere from 100ms to several seconds; an SWR
|
||||
// refetch that lands inside that window returns the assistant row
|
||||
// as the LOADING_FLAT placeholder (cLen=3) and would collapse the
|
||||
// in-memory streamed content. SWR's own cache still receives the
|
||||
// value, so once streaming ends a normal revalidate writes through.
|
||||
//
|
||||
// This is the catch-all backstop sitting BELOW the SoT consumption
|
||||
// in gatewayEventHandler — `mergeFetchedMessagesWithLocalState`'s
|
||||
// updatedAt tie-breaker handles most cases on its own, but the
|
||||
// updatedAt comparison degenerates when server's pushed snapshot
|
||||
// carries a DB updatedAt equal to a later stale fetch's row.
|
||||
if (operationSelectors.isAgentRuntimeRunningByContext(context)(getChatStoreState()))
|
||||
return;
|
||||
|
||||
const prevDbMessages = get().dbMessages;
|
||||
const mergedMessages = mergeFetchedMessagesWithLocalState(data, prevDbMessages);
|
||||
const storeContextKey = messageMapKey(get().context);
|
||||
|
||||
@@ -64,7 +64,7 @@ const ShareDataProvider = memo<PropsWithChildren<ShareDataProviderProps>>(
|
||||
}, [activeAgentId, activeGroupId, activeThreadId, activeTopicId, context]);
|
||||
|
||||
const shouldSkipFetch = !resolvedContext.agentId || !resolvedContext.topicId;
|
||||
const { isLoading } = useFetchMessages(resolvedContext, shouldSkipFetch);
|
||||
const { isLoading } = useFetchMessages(resolvedContext, { skipFetch: shouldSkipFetch });
|
||||
|
||||
const messageKey = useMemo(() => {
|
||||
if (!resolvedContext.agentId) return undefined;
|
||||
|
||||
@@ -18,6 +18,8 @@ import { LOBE_THEME_NEUTRAL_COLOR, LOBE_THEME_PRIMARY_COLOR } from '@/const/them
|
||||
import { isDesktop } from '@/const/version';
|
||||
import { useIsDark } from '@/hooks/useIsDark';
|
||||
import { getUILocaleAndResources } from '@/libs/getUILocaleAndResources';
|
||||
import type { UILocaleResources } from '@/libs/getUILocaleAndResources.utils';
|
||||
import { resolveUILocale } from '@/libs/getUILocaleAndResources.utils';
|
||||
import Image from '@/libs/next/Image';
|
||||
import { useGlobalStore } from '@/store/global';
|
||||
import { systemStatusSelectors } from '@/store/global/selectors';
|
||||
@@ -115,20 +117,22 @@ const AppTheme = memo<AppThemeProps>(
|
||||
[messageTop],
|
||||
);
|
||||
|
||||
const [uiResources, setUIResources] = useState<any>(null);
|
||||
const uiLocale = useMemo(() => {
|
||||
if (language.startsWith('zh')) return 'zh-CN';
|
||||
if (language.startsWith('en')) return 'en-US';
|
||||
return 'en-US';
|
||||
}, [language]);
|
||||
const [uiResources, setUIResources] = useState<UILocaleResources>();
|
||||
const [uiLocale, setUILocale] = useState(() => resolveUILocale(language).uiLocale);
|
||||
|
||||
useEffect(() => {
|
||||
let mounted = true;
|
||||
getUILocaleAndResources(language).then(({ resources }) => {
|
||||
if (mounted) {
|
||||
setUIResources(resources);
|
||||
}
|
||||
});
|
||||
setUILocale(resolveUILocale(language).uiLocale);
|
||||
getUILocaleAndResources(language)
|
||||
.then(({ locale, resources }) => {
|
||||
if (mounted) {
|
||||
setUILocale(locale);
|
||||
setUIResources(resources);
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error('Failed to load UI locale resources:', error);
|
||||
});
|
||||
return () => {
|
||||
mounted = false;
|
||||
};
|
||||
|
||||
@@ -1,24 +1,23 @@
|
||||
import { en, zhCn } from '@lobehub/ui/es/i18n/resources/index';
|
||||
|
||||
import { normalizeLocale } from '@/locales/resources';
|
||||
|
||||
type UILocaleResources = Record<string, Record<string, string>>;
|
||||
import type { UILocaleResourceInput, UILocaleResources } from './getUILocaleAndResources.utils';
|
||||
import {
|
||||
mergeUILocaleResources,
|
||||
normalizeUILocaleResources,
|
||||
resolveUILocale,
|
||||
} from './getUILocaleAndResources.utils';
|
||||
|
||||
// eager: true — UI locale fully inlined at build time
|
||||
const uiLocaleModules = import.meta.glob<{ default: UILocaleResources }>('/locales/*/ui.json', {
|
||||
const uiLocaleModules = import.meta.glob<{ default: UILocaleResourceInput }>('/locales/*/ui.json', {
|
||||
eager: true,
|
||||
});
|
||||
|
||||
const getUILocale = (locale: string): string => {
|
||||
if (locale.startsWith('zh')) return 'zh-CN';
|
||||
if (locale.startsWith('en')) return 'en-US';
|
||||
return locale;
|
||||
};
|
||||
|
||||
const loadBusinessResources = (locale: string): UILocaleResources | null => {
|
||||
const key = `/locales/${locale}/ui.json`;
|
||||
const mod = uiLocaleModules[key];
|
||||
return mod ? (mod.default as UILocaleResources) : null;
|
||||
const resources = mod?.default as UILocaleResourceInput | null | undefined;
|
||||
|
||||
return resources ? normalizeUILocaleResources(resources) : null;
|
||||
};
|
||||
|
||||
const loadLobeUIBuiltinResources = (locale: string): UILocaleResources | null => {
|
||||
@@ -29,15 +28,14 @@ const loadLobeUIBuiltinResources = (locale: string): UILocaleResources | null =>
|
||||
export const getUILocaleAndResources = async (
|
||||
locale: string | 'auto',
|
||||
): Promise<{ locale: string; resources: UILocaleResources }> => {
|
||||
const effectiveLocale = locale === 'auto' ? 'en-US' : locale;
|
||||
const normalizedLocale = normalizeLocale(effectiveLocale);
|
||||
const uiLocale = getUILocale(normalizedLocale);
|
||||
const { normalizedLocale, uiLocale } = resolveUILocale(locale);
|
||||
|
||||
const resources =
|
||||
loadBusinessResources(normalizedLocale) ??
|
||||
loadLobeUIBuiltinResources(normalizedLocale) ??
|
||||
loadBusinessResources('en-US') ??
|
||||
loadLobeUIBuiltinResources('en-US');
|
||||
mergeUILocaleResources(
|
||||
loadLobeUIBuiltinResources(normalizedLocale),
|
||||
loadBusinessResources(normalizedLocale),
|
||||
) ??
|
||||
mergeUILocaleResources(loadLobeUIBuiltinResources('en-US'), loadBusinessResources('en-US'));
|
||||
|
||||
if (!resources)
|
||||
throw new Error(
|
||||
|
||||
@@ -2,6 +2,11 @@ import { describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { getUILocaleAndResources } from './getUILocaleAndResources';
|
||||
|
||||
const translateFromUILocaleResources = (
|
||||
resources: Record<string, Record<string, string>>,
|
||||
key: string,
|
||||
) => Object.assign({}, ...Object.values(resources))[key];
|
||||
|
||||
describe('getUILocaleAndResources', () => {
|
||||
it('should return zh-CN locale and zhCn resources for zh-CN', async () => {
|
||||
const result = await getUILocaleAndResources('zh-CN');
|
||||
@@ -9,6 +14,30 @@ describe('getUILocaleAndResources', () => {
|
||||
expect(result.resources).toBeDefined();
|
||||
});
|
||||
|
||||
it('should normalize business ui.json into a @lobehub/ui consumable resource map', async () => {
|
||||
const result = await getUILocaleAndResources('zh-CN');
|
||||
|
||||
expect(translateFromUILocaleResources(result.resources, 'form.submit')).toBe('提交');
|
||||
});
|
||||
|
||||
it('should merge built-in resources with partial business ui.json resources', async () => {
|
||||
const result = await getUILocaleAndResources('zh-CN');
|
||||
|
||||
expect(translateFromUILocaleResources(result.resources, 'image.copy')).toBe('复制');
|
||||
expect(translateFromUILocaleResources(result.resources, 'hotkey.clear')).toBe('清除绑定');
|
||||
expect(translateFromUILocaleResources(result.resources, 'form.submit')).toBe('提交');
|
||||
});
|
||||
|
||||
it('should merge en built-in fallback resources for non-en/zh partial business ui.json resources', async () => {
|
||||
const result = await getUILocaleAndResources('de-DE');
|
||||
|
||||
expect(result.locale).toBe('de-DE');
|
||||
expect(translateFromUILocaleResources(result.resources, 'image.copy')).toBe('Copy');
|
||||
expect(translateFromUILocaleResources(result.resources, 'hotkey.clear')).toBe('Clear binding');
|
||||
expect(translateFromUILocaleResources(result.resources, 'common.empty')).toBe('(empty)');
|
||||
expect(translateFromUILocaleResources(result.resources, 'form.submit')).toBe('Absenden');
|
||||
});
|
||||
|
||||
it('should return zh-CN locale and zhCn resources for zh-TW', async () => {
|
||||
const result = await getUILocaleAndResources('zh-TW');
|
||||
expect(result.locale).toBe('zh-CN');
|
||||
@@ -27,10 +56,18 @@ describe('getUILocaleAndResources', () => {
|
||||
expect(result.resources).toBeDefined();
|
||||
});
|
||||
|
||||
it('should return en-US locale and en resources for auto', async () => {
|
||||
const result = await getUILocaleAndResources('auto');
|
||||
expect(result.locale).toBe('en-US');
|
||||
expect(result.resources).toBeDefined();
|
||||
it('should resolve auto from the current document language', async () => {
|
||||
const previousLang = document.documentElement.lang;
|
||||
document.documentElement.lang = 'zh-CN';
|
||||
|
||||
try {
|
||||
const result = await getUILocaleAndResources('auto');
|
||||
|
||||
expect(result.locale).toBe('zh-CN');
|
||||
expect(translateFromUILocaleResources(result.resources, 'form.submit')).toBe('提交');
|
||||
} finally {
|
||||
document.documentElement.lang = previousLang;
|
||||
}
|
||||
});
|
||||
|
||||
it('should return ar locale and custom resources for ar', async () => {
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
import { normalizeLocale } from '@/locales/resources';
|
||||
|
||||
type UILocaleResources = Record<string, Record<string, string>>;
|
||||
|
||||
const getUILocale = (locale: string): string => {
|
||||
if (locale.startsWith('zh')) return 'zh-CN';
|
||||
if (locale.startsWith('en')) return 'en-US';
|
||||
return locale;
|
||||
};
|
||||
import type { UILocaleResourceInput, UILocaleResources } from './getUILocaleAndResources.utils';
|
||||
import {
|
||||
mergeUILocaleResources,
|
||||
normalizeUILocaleResources,
|
||||
resolveUILocale,
|
||||
} from './getUILocaleAndResources.utils';
|
||||
|
||||
const loadBusinessResources = async (locale: string): Promise<UILocaleResources | null> => {
|
||||
try {
|
||||
const resourcesModule = await import(`@/../locales/${locale}/ui.json`);
|
||||
return resourcesModule.default as UILocaleResources;
|
||||
const resources = resourcesModule.default as UILocaleResourceInput | null;
|
||||
|
||||
return resources ? normalizeUILocaleResources(resources) : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
@@ -31,19 +30,17 @@ const loadLobeUIBuiltinResources = async (locale: string): Promise<UILocaleResou
|
||||
export const getUILocaleAndResources = async (
|
||||
locale: string | 'auto',
|
||||
): Promise<{ locale: string; resources: UILocaleResources }> => {
|
||||
const effectiveLocale = locale === 'auto' ? 'en-US' : locale;
|
||||
const normalizedLocale = normalizeLocale(effectiveLocale);
|
||||
const uiLocale = getUILocale(normalizedLocale);
|
||||
const { normalizedLocale, uiLocale } = resolveUILocale(locale);
|
||||
|
||||
// Priority:
|
||||
// 1) business-defined ui.json
|
||||
// 2) @lobehub/ui built-in resources (en/zh)
|
||||
// 3) fallback to default en
|
||||
const resources =
|
||||
(await loadBusinessResources(normalizedLocale)) ??
|
||||
(await loadLobeUIBuiltinResources(normalizedLocale)) ??
|
||||
(await loadBusinessResources('en-US')) ??
|
||||
(await loadLobeUIBuiltinResources('en-US'));
|
||||
mergeUILocaleResources(
|
||||
await loadLobeUIBuiltinResources(normalizedLocale),
|
||||
await loadBusinessResources(normalizedLocale),
|
||||
) ??
|
||||
mergeUILocaleResources(
|
||||
await loadLobeUIBuiltinResources('en-US'),
|
||||
await loadBusinessResources('en-US'),
|
||||
);
|
||||
|
||||
if (!resources)
|
||||
throw new Error(
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
import { DEFAULT_LANG } from '@/const/locale';
|
||||
import { normalizeLocale } from '@/locales/resources';
|
||||
|
||||
export type UILocaleResourceBundle = Record<string, string>;
|
||||
export type UILocaleResources = Record<string, UILocaleResourceBundle>;
|
||||
export type UILocaleResourceInput = UILocaleResourceBundle | UILocaleResources;
|
||||
|
||||
const getDocumentLocale = () => {
|
||||
if (typeof document === 'undefined') return;
|
||||
|
||||
return document.documentElement.lang || undefined;
|
||||
};
|
||||
|
||||
const getNavigatorLocale = () => {
|
||||
if (typeof navigator === 'undefined') return;
|
||||
|
||||
return navigator.language || undefined;
|
||||
};
|
||||
|
||||
const getUILocale = (locale: string): string => {
|
||||
if (locale.startsWith('zh')) return 'zh-CN';
|
||||
if (locale.startsWith('en')) return 'en-US';
|
||||
return locale;
|
||||
};
|
||||
|
||||
const isFlatUILocaleResources = (
|
||||
resources: UILocaleResourceInput,
|
||||
): resources is UILocaleResourceBundle =>
|
||||
Object.values(resources).every((value) => typeof value === 'string');
|
||||
|
||||
const flattenUILocaleResources = (resources: UILocaleResourceInput): UILocaleResourceBundle =>
|
||||
isFlatUILocaleResources(resources) ? resources : Object.assign({}, ...Object.values(resources));
|
||||
|
||||
export const normalizeUILocaleResources = (
|
||||
resources: UILocaleResourceInput,
|
||||
): UILocaleResources => ({
|
||||
app: flattenUILocaleResources(resources),
|
||||
});
|
||||
|
||||
export const mergeUILocaleResources = (
|
||||
...resourcesList: (UILocaleResourceInput | null)[]
|
||||
): UILocaleResources | null => {
|
||||
const mergedResources = Object.assign(
|
||||
{},
|
||||
...resourcesList.filter(Boolean).map((resources) => flattenUILocaleResources(resources!)),
|
||||
);
|
||||
|
||||
return Object.keys(mergedResources).length > 0 ? { app: mergedResources } : null;
|
||||
};
|
||||
|
||||
export const resolveUILocale = (locale: string | 'auto') => {
|
||||
const effectiveLocale =
|
||||
locale === 'auto' ? (getDocumentLocale() ?? getNavigatorLocale() ?? DEFAULT_LANG) : locale;
|
||||
const normalizedLocale = normalizeLocale(effectiveLocale);
|
||||
|
||||
return {
|
||||
normalizedLocale,
|
||||
uiLocale: getUILocale(normalizedLocale),
|
||||
};
|
||||
};
|
||||
@@ -1,14 +1,11 @@
|
||||
import { normalizeLocale } from '@/locales/resources';
|
||||
import type { UILocaleResourceInput, UILocaleResources } from './getUILocaleAndResources.utils';
|
||||
import {
|
||||
mergeUILocaleResources,
|
||||
normalizeUILocaleResources,
|
||||
resolveUILocale,
|
||||
} from './getUILocaleAndResources.utils';
|
||||
|
||||
type UILocaleResources = Record<string, Record<string, string>>;
|
||||
|
||||
const uiLocaleLoaders = import.meta.glob<{ default: UILocaleResources }>('/locales/*/ui.json');
|
||||
|
||||
const getUILocale = (locale: string): string => {
|
||||
if (locale.startsWith('zh')) return 'zh-CN';
|
||||
if (locale.startsWith('en')) return 'en-US';
|
||||
return locale;
|
||||
};
|
||||
const uiLocaleLoaders = import.meta.glob<{ default: UILocaleResourceInput }>('/locales/*/ui.json');
|
||||
|
||||
const loadBusinessResources = async (locale: string): Promise<UILocaleResources | null> => {
|
||||
const key = `/locales/${locale}/ui.json`;
|
||||
@@ -16,7 +13,9 @@ const loadBusinessResources = async (locale: string): Promise<UILocaleResources
|
||||
if (!loader) return null;
|
||||
try {
|
||||
const mod = await loader();
|
||||
return mod.default as UILocaleResources;
|
||||
const resources = mod.default as UILocaleResourceInput | null;
|
||||
|
||||
return resources ? normalizeUILocaleResources(resources) : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
@@ -36,15 +35,17 @@ const loadLobeUIBuiltinResources = async (locale: string): Promise<UILocaleResou
|
||||
export const getUILocaleAndResources = async (
|
||||
locale: string | 'auto',
|
||||
): Promise<{ locale: string; resources: UILocaleResources }> => {
|
||||
const effectiveLocale = locale === 'auto' ? 'en-US' : locale;
|
||||
const normalizedLocale = normalizeLocale(effectiveLocale);
|
||||
const uiLocale = getUILocale(normalizedLocale);
|
||||
const { normalizedLocale, uiLocale } = resolveUILocale(locale);
|
||||
|
||||
const resources =
|
||||
(await loadBusinessResources(normalizedLocale)) ??
|
||||
(await loadLobeUIBuiltinResources(normalizedLocale)) ??
|
||||
(await loadBusinessResources('en-US')) ??
|
||||
(await loadLobeUIBuiltinResources('en-US'));
|
||||
mergeUILocaleResources(
|
||||
await loadLobeUIBuiltinResources(normalizedLocale),
|
||||
await loadBusinessResources(normalizedLocale),
|
||||
) ??
|
||||
mergeUILocaleResources(
|
||||
await loadLobeUIBuiltinResources('en-US'),
|
||||
await loadBusinessResources('en-US'),
|
||||
);
|
||||
|
||||
if (!resources)
|
||||
throw new Error(
|
||||
|
||||
@@ -15,6 +15,8 @@ const lobeHubOnlineModelLocales = {
|
||||
'grok-4.20-beta-0309-non-reasoning.description': 'A non-reasoning variant for simple use cases',
|
||||
'MiniMax-M2.1-Lightning.description':
|
||||
'Powerful multilingual programming capabilities with faster and more efficient inference.',
|
||||
'qwen3.7-max.description':
|
||||
"Qwen3.7-Max is Alibaba Cloud's flagship agent-era model for complex coding, reasoning, office automation, and long-horizon autonomous workflows.",
|
||||
'seedream-5-0-260128.description':
|
||||
'ByteDance-Seedream-5.0-lite by BytePlus features web-retrieval-augmented generation for real-time information, enhanced complex prompt interpretation, and improved reference consistency for professional visual creation.',
|
||||
'fal-ai/bytedance/seedream/v4.5.description':
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
import { render, screen } from '@testing-library/react';
|
||||
import type { PropsWithChildren, ReactNode } from 'react';
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { ChatSettingsTabs } from '@/store/global/initialState';
|
||||
|
||||
import Content from './Content';
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
agentState: {
|
||||
activeAgentId: 'inbox-agent',
|
||||
config: {},
|
||||
isInbox: true,
|
||||
meta: {},
|
||||
optimisticUpdateAgentConfig: vi.fn(),
|
||||
optimisticUpdateAgentMeta: vi.fn(),
|
||||
},
|
||||
serverState: {
|
||||
featureFlags: {
|
||||
enableAgentSelfIteration: true,
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@lobehub/ui', () => ({
|
||||
Avatar: () => <div data-testid="avatar" />,
|
||||
Block: ({ children }: PropsWithChildren) => <div>{children}</div>,
|
||||
Flexbox: ({ children }: PropsWithChildren) => <div>{children}</div>,
|
||||
Icon: () => <span />,
|
||||
Text: ({ children }: PropsWithChildren) => <span>{children}</span>,
|
||||
}));
|
||||
|
||||
vi.mock('@/components/Menu', () => ({
|
||||
default: ({
|
||||
items = [],
|
||||
onClick,
|
||||
selectedKeys = [],
|
||||
}: {
|
||||
items?: { key?: string; label?: ReactNode }[];
|
||||
onClick?: ({ key }: { key: string }) => void;
|
||||
selectedKeys?: string[];
|
||||
}) => (
|
||||
<div data-selected={selectedKeys.join(',')} data-testid="agent-settings-menu">
|
||||
{items.map((item) => (
|
||||
<button
|
||||
key={item.key}
|
||||
type="button"
|
||||
onClick={() => item.key && onClick?.({ key: item.key })}
|
||||
>
|
||||
{item.label}
|
||||
</button>
|
||||
))}
|
||||
</div>
|
||||
),
|
||||
}));
|
||||
|
||||
vi.mock('@/features/AgentSetting', () => ({
|
||||
AgentSettings: ({ tab }: { tab: ChatSettingsTabs }) => (
|
||||
<div data-tab={tab} data-testid="agent-settings-content" />
|
||||
),
|
||||
}));
|
||||
|
||||
vi.mock('@/store/agent', () => {
|
||||
const useAgentStore = (selector: (state: typeof mocks.agentState) => unknown) =>
|
||||
selector(mocks.agentState);
|
||||
useAgentStore.getState = () => mocks.agentState;
|
||||
|
||||
return { useAgentStore };
|
||||
});
|
||||
|
||||
vi.mock('@/store/agent/selectors', () => ({
|
||||
agentSelectors: {
|
||||
currentAgentConfig: (state: typeof mocks.agentState) => state.config,
|
||||
currentAgentMeta: (state: typeof mocks.agentState) => state.meta,
|
||||
},
|
||||
builtinAgentSelectors: {
|
||||
isInboxAgent: (state: typeof mocks.agentState) => state.isInbox,
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@/store/serverConfig', () => ({
|
||||
featureFlagsSelectors: (state: typeof mocks.serverState) => state.featureFlags,
|
||||
useServerConfigStore: (selector: (state: typeof mocks.serverState) => unknown) =>
|
||||
selector(mocks.serverState),
|
||||
}));
|
||||
|
||||
vi.mock('antd-style', () => ({
|
||||
useTheme: () => ({
|
||||
colorBgLayout: '#fff',
|
||||
colorBorderSecondary: '#eee',
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock('react-i18next', () => ({
|
||||
useTranslation: () => ({
|
||||
t: (key: string) => key,
|
||||
}),
|
||||
}));
|
||||
|
||||
describe('AgentSettings Content', () => {
|
||||
beforeEach(() => {
|
||||
mocks.agentState.isInbox = true;
|
||||
mocks.serverState.featureFlags.enableAgentSelfIteration = true;
|
||||
});
|
||||
|
||||
it('should select self iteration when inbox hides opening settings', () => {
|
||||
render(<Content />);
|
||||
|
||||
expect(screen.queryByRole('button', { name: 'agentTab.opening' })).not.toBeInTheDocument();
|
||||
expect(screen.getByRole('button', { name: 'agentTab.selfIteration' })).toBeInTheDocument();
|
||||
expect(screen.getByTestId('agent-settings-menu')).toHaveAttribute(
|
||||
'data-selected',
|
||||
ChatSettingsTabs.SelfIteration,
|
||||
);
|
||||
expect(screen.getByTestId('agent-settings-content')).toHaveAttribute(
|
||||
'data-tab',
|
||||
ChatSettingsTabs.SelfIteration,
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -5,7 +5,7 @@ import { type ItemType } from 'antd/es/menu/interface';
|
||||
import { useTheme } from 'antd-style';
|
||||
import isEqual from 'fast-deep-equal';
|
||||
import { ActivityIcon, MessageSquareHeartIcon } from 'lucide-react';
|
||||
import { memo, useMemo, useState } from 'react';
|
||||
import { memo, useEffect, useMemo, useState } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { shallow } from 'zustand/shallow';
|
||||
|
||||
@@ -29,6 +29,21 @@ const Content = memo(() => {
|
||||
const { enableAgentSelfIteration } = useServerConfigStore(featureFlagsSelectors);
|
||||
const [tab, setTab] = useState(ChatSettingsTabs.Opening);
|
||||
|
||||
const availableTabs = useMemo(
|
||||
() =>
|
||||
[
|
||||
!isInbox ? ChatSettingsTabs.Opening : null,
|
||||
enableAgentSelfIteration ? ChatSettingsTabs.SelfIteration : null,
|
||||
].filter(Boolean) as ChatSettingsTabs[],
|
||||
[isInbox, enableAgentSelfIteration],
|
||||
);
|
||||
|
||||
const activeTab = availableTabs.includes(tab) ? tab : availableTabs[0];
|
||||
|
||||
useEffect(() => {
|
||||
if (activeTab && activeTab !== tab) setTab(activeTab);
|
||||
}, [activeTab, tab]);
|
||||
|
||||
const updateAgentConfig = async (config: any) => {
|
||||
if (!agentId) return;
|
||||
await useAgentStore.getState().optimisticUpdateAgentConfig(agentId, config);
|
||||
@@ -41,23 +56,30 @@ const Content = memo(() => {
|
||||
|
||||
const menuItems: ItemType[] = useMemo(
|
||||
() =>
|
||||
[
|
||||
!isInbox
|
||||
? {
|
||||
icon: <Icon icon={MessageSquareHeartIcon} />,
|
||||
key: ChatSettingsTabs.Opening,
|
||||
label: t('agentTab.opening'),
|
||||
availableTabs
|
||||
.map((tab) => {
|
||||
switch (tab) {
|
||||
case ChatSettingsTabs.Opening: {
|
||||
return {
|
||||
icon: <Icon icon={MessageSquareHeartIcon} />,
|
||||
key: ChatSettingsTabs.Opening,
|
||||
label: t('agentTab.opening'),
|
||||
};
|
||||
}
|
||||
: null,
|
||||
enableAgentSelfIteration
|
||||
? {
|
||||
icon: <Icon icon={ActivityIcon} />,
|
||||
key: ChatSettingsTabs.SelfIteration,
|
||||
label: t('agentTab.selfIteration'),
|
||||
case ChatSettingsTabs.SelfIteration: {
|
||||
return {
|
||||
icon: <Icon icon={ActivityIcon} />,
|
||||
key: ChatSettingsTabs.SelfIteration,
|
||||
label: t('agentTab.selfIteration'),
|
||||
};
|
||||
}
|
||||
: null,
|
||||
].filter(Boolean) as ItemType[],
|
||||
[t, isInbox, enableAgentSelfIteration],
|
||||
default: {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
})
|
||||
.filter(Boolean) as ItemType[],
|
||||
[availableTabs, t],
|
||||
);
|
||||
|
||||
const displayTitle = isInbox ? 'Lobe AI' : meta.title || t('defaultSession', { ns: 'common' });
|
||||
@@ -105,7 +127,7 @@ const Content = memo(() => {
|
||||
<Menu
|
||||
selectable
|
||||
items={menuItems}
|
||||
selectedKeys={[tab]}
|
||||
selectedKeys={activeTab ? [activeTab] : []}
|
||||
style={{ width: '100%' }}
|
||||
onClick={({ key }) => setTab(key as ChatSettingsTabs)}
|
||||
/>
|
||||
@@ -116,15 +138,17 @@ const Content = memo(() => {
|
||||
paddingInline={64}
|
||||
style={{ overflow: 'scroll', width: '100%' }}
|
||||
>
|
||||
<Settings
|
||||
config={config}
|
||||
id={agentId}
|
||||
loading={false}
|
||||
meta={meta}
|
||||
tab={tab}
|
||||
onConfigChange={updateAgentConfig}
|
||||
onMetaChange={updateAgentMeta}
|
||||
/>
|
||||
{activeTab && (
|
||||
<Settings
|
||||
config={config}
|
||||
id={agentId}
|
||||
loading={false}
|
||||
meta={meta}
|
||||
tab={activeTab}
|
||||
onConfigChange={updateAgentConfig}
|
||||
onMetaChange={updateAgentMeta}
|
||||
/>
|
||||
)}
|
||||
</Flexbox>
|
||||
</Flexbox>
|
||||
);
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
import isEqual from 'fast-deep-equal';
|
||||
import { memo, useCallback, useState } from 'react';
|
||||
|
||||
import { useAgentStore } from '@/store/agent';
|
||||
import { agentSelectors } from '@/store/agent/selectors';
|
||||
|
||||
import PublishButton from './PublishButton';
|
||||
import PublishResultModal from './PublishResultModal';
|
||||
|
||||
/**
|
||||
* Agent Publish Button Component
|
||||
*
|
||||
* Simplified version - backend now handles ownership check automatically.
|
||||
* The action type (submit vs upload) is determined by backend based on:
|
||||
* 1. Whether the identifier exists
|
||||
* 2. Whether the current user is the owner
|
||||
*/
|
||||
const AgentPublishButton = memo(() => {
|
||||
const meta = useAgentStore(agentSelectors.currentAgentMeta, isEqual);
|
||||
|
||||
const [showResultModal, setShowResultModal] = useState(false);
|
||||
const [publishedIdentifier, setPublishedIdentifier] = useState<string>();
|
||||
|
||||
const handlePublishSuccess = useCallback((identifier: string) => {
|
||||
setPublishedIdentifier(identifier);
|
||||
setShowResultModal(true);
|
||||
}, []);
|
||||
|
||||
// Determine action based on whether we have an existing marketIdentifier
|
||||
// Backend will verify ownership and decide to create new or update
|
||||
const action = meta?.marketIdentifier ? 'upload' : 'submit';
|
||||
|
||||
return (
|
||||
<>
|
||||
<PublishButton action={action} onPublishSuccess={handlePublishSuccess} />
|
||||
<PublishResultModal
|
||||
identifier={publishedIdentifier}
|
||||
open={showResultModal}
|
||||
onCancel={() => setShowResultModal(false)}
|
||||
/>
|
||||
</>
|
||||
);
|
||||
});
|
||||
|
||||
export default AgentPublishButton;
|
||||
@@ -0,0 +1,214 @@
|
||||
import { render, screen } from '@testing-library/react';
|
||||
import type { PropsWithChildren, ReactNode } from 'react';
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import Header from './index';
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
agentState: {
|
||||
activeAgentId: 'agent-1',
|
||||
canCurrentAgentPublishToCommunity: true,
|
||||
isCurrentAgentHeterogeneous: false,
|
||||
meta: {
|
||||
title: 'Test Agent',
|
||||
},
|
||||
systemRole: 'You are helpful.',
|
||||
},
|
||||
globalState: {
|
||||
isStatusInit: true,
|
||||
showAgentBuilderPanel: false,
|
||||
toggleAgentBuilderPanel: vi.fn(),
|
||||
},
|
||||
homeState: {
|
||||
removeAgent: vi.fn(),
|
||||
},
|
||||
marketAuth: {
|
||||
isAuthenticated: true,
|
||||
isLoading: false,
|
||||
signIn: vi.fn(),
|
||||
},
|
||||
marketPublish: {
|
||||
checkOwnership: vi.fn(),
|
||||
isPublishing: false,
|
||||
publish: vi.fn(),
|
||||
},
|
||||
navigate: vi.fn(),
|
||||
versionReviewStatus: {
|
||||
isUnderReview: false,
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@lobehub/ui', () => ({
|
||||
ActionIcon: () => <button aria-label="more" type="button" />,
|
||||
DropdownMenu: ({
|
||||
children,
|
||||
items = [],
|
||||
}: PropsWithChildren<{
|
||||
items?: Array<{ key?: string; label?: ReactNode; type?: string }>;
|
||||
}>) => (
|
||||
<div>
|
||||
{children}
|
||||
<div data-testid="agent-profile-menu">
|
||||
{items
|
||||
.filter((item) => item.type !== 'divider')
|
||||
.map((item) => (
|
||||
<button key={item.key} type="button">
|
||||
{item.label}
|
||||
</button>
|
||||
))}
|
||||
</div>
|
||||
</div>
|
||||
),
|
||||
Flexbox: ({ children }: PropsWithChildren) => <div>{children}</div>,
|
||||
Icon: () => <span />,
|
||||
}));
|
||||
|
||||
vi.mock('@lobehub/ui/icons', () => ({
|
||||
ShapesUploadIcon: () => null,
|
||||
}));
|
||||
|
||||
vi.mock('antd', () => ({
|
||||
App: {
|
||||
useApp: () => ({
|
||||
modal: {
|
||||
confirm: vi.fn(),
|
||||
},
|
||||
}),
|
||||
},
|
||||
Modal: {
|
||||
confirm: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('lucide-react', () => ({
|
||||
BotMessageSquareIcon: () => null,
|
||||
MoreHorizontal: () => null,
|
||||
Settings2Icon: () => null,
|
||||
Trash: () => null,
|
||||
}));
|
||||
|
||||
vi.mock('react-i18next', () => ({
|
||||
useTranslation: () => ({
|
||||
t: (key: string) => key,
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock('react-router-dom', () => ({
|
||||
useNavigate: () => mocks.navigate,
|
||||
}));
|
||||
|
||||
vi.mock('@/components/AntdStaticMethods', () => ({
|
||||
message: {
|
||||
error: vi.fn(),
|
||||
success: vi.fn(),
|
||||
warning: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@/const/layoutTokens', () => ({
|
||||
DESKTOP_HEADER_ICON_SMALL_SIZE: 24,
|
||||
}));
|
||||
|
||||
vi.mock('@/features/NavHeader', () => ({
|
||||
default: ({ left, right }: { left?: ReactNode; right?: ReactNode }) => (
|
||||
<header>
|
||||
{left}
|
||||
{right}
|
||||
</header>
|
||||
),
|
||||
}));
|
||||
|
||||
vi.mock('@/features/RightPanel/ToggleRightPanelButton', () => ({
|
||||
default: () => <button type="button">agentBuilder</button>,
|
||||
}));
|
||||
|
||||
vi.mock('@/layout/AuthProvider/MarketAuth', () => ({
|
||||
useMarketAuth: () => mocks.marketAuth,
|
||||
}));
|
||||
|
||||
vi.mock('@/layout/AuthProvider/MarketAuth/errors', () => ({
|
||||
resolveMarketAuthError: () => ({ code: 'unknown' }),
|
||||
}));
|
||||
|
||||
vi.mock('@/store/agent', () => ({
|
||||
useAgentStore: (selector: (state: typeof mocks.agentState) => unknown) =>
|
||||
selector(mocks.agentState),
|
||||
}));
|
||||
|
||||
vi.mock('@/store/agent/selectors', () => ({
|
||||
agentSelectors: {
|
||||
canCurrentAgentPublishToCommunity: (state: typeof mocks.agentState) =>
|
||||
state.canCurrentAgentPublishToCommunity,
|
||||
currentAgentMeta: (state: typeof mocks.agentState) => state.meta,
|
||||
currentAgentSystemRole: (state: typeof mocks.agentState) => state.systemRole,
|
||||
isCurrentAgentHeterogeneous: (state: typeof mocks.agentState) =>
|
||||
state.isCurrentAgentHeterogeneous,
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@/store/global', () => ({
|
||||
useGlobalStore: (selector: (state: typeof mocks.globalState) => unknown) =>
|
||||
selector(mocks.globalState),
|
||||
}));
|
||||
|
||||
vi.mock('@/store/global/selectors', () => ({
|
||||
systemStatusSelectors: {
|
||||
isStatusInit: (state: typeof mocks.globalState) => state.isStatusInit,
|
||||
showAgentBuilderPanel: (state: typeof mocks.globalState) => state.showAgentBuilderPanel,
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@/store/home', () => ({
|
||||
useHomeStore: (selector: (state: typeof mocks.homeState) => unknown) => selector(mocks.homeState),
|
||||
}));
|
||||
|
||||
vi.mock('./AgentForkTag', () => ({
|
||||
default: () => null,
|
||||
}));
|
||||
|
||||
vi.mock('./AgentPublishButton/ForkConfirmModal', () => ({
|
||||
default: () => null,
|
||||
}));
|
||||
|
||||
vi.mock('./AgentPublishButton/PublishResultModal', () => ({
|
||||
default: () => null,
|
||||
}));
|
||||
|
||||
vi.mock('./AgentPublishButton/useMarketPublish', () => ({
|
||||
useMarketPublish: () => mocks.marketPublish,
|
||||
}));
|
||||
|
||||
vi.mock('./AgentStatusTag', () => ({
|
||||
default: () => null,
|
||||
}));
|
||||
|
||||
vi.mock('./AutoSaveHint', () => ({
|
||||
default: () => null,
|
||||
}));
|
||||
|
||||
vi.mock('./AgentVersionReviewTag', () => ({
|
||||
default: () => null,
|
||||
useVersionReviewStatus: () => mocks.versionReviewStatus,
|
||||
}));
|
||||
|
||||
describe('Agent profile Header', () => {
|
||||
beforeEach(() => {
|
||||
mocks.agentState.canCurrentAgentPublishToCommunity = true;
|
||||
mocks.agentState.isCurrentAgentHeterogeneous = false;
|
||||
});
|
||||
|
||||
it('should show the community publish action for normal agents', () => {
|
||||
render(<Header />);
|
||||
|
||||
expect(screen.getByRole('button', { name: 'publishToCommunity' })).toBeInTheDocument();
|
||||
});
|
||||
|
||||
it('should hide the community publish action for heterogeneous and platform agents', () => {
|
||||
mocks.agentState.canCurrentAgentPublishToCommunity = false;
|
||||
mocks.agentState.isCurrentAgentHeterogeneous = true;
|
||||
|
||||
render(<Header />);
|
||||
|
||||
expect(screen.queryByRole('button', { name: 'publishToCommunity' })).not.toBeInTheDocument();
|
||||
});
|
||||
});
|
||||
@@ -36,6 +36,7 @@ const Header = memo(() => {
|
||||
const systemRole = useAgentStore(agentSelectors.currentAgentSystemRole);
|
||||
const activeAgentId = useAgentStore((s) => s.activeAgentId);
|
||||
const isHeterogeneous = useAgentStore(agentSelectors.isCurrentAgentHeterogeneous);
|
||||
const canPublishToCommunity = useAgentStore(agentSelectors.canCurrentAgentPublishToCommunity);
|
||||
const [showAgentBuilderPanel, toggleAgentBuilderPanel, isStatusInit] = useGlobalStore((s) => [
|
||||
systemStatusSelectors.showAgentBuilderPanel(s),
|
||||
s.toggleAgentBuilderPanel,
|
||||
@@ -148,13 +149,17 @@ const Header = memo(() => {
|
||||
onClick: () => useAgentStore.setState({ showAgentSetting: true }),
|
||||
},
|
||||
{ type: 'divider' as const },
|
||||
{
|
||||
icon: <Icon icon={ShapesUploadIcon} />,
|
||||
key: 'publish',
|
||||
label: t('publishToCommunity', { ns: 'setting' }),
|
||||
onClick: handlePublishClick,
|
||||
},
|
||||
{ type: 'divider' as const },
|
||||
...(canPublishToCommunity
|
||||
? [
|
||||
{
|
||||
icon: <Icon icon={ShapesUploadIcon} />,
|
||||
key: 'publish',
|
||||
label: t('publishToCommunity', { ns: 'setting' }),
|
||||
onClick: handlePublishClick,
|
||||
},
|
||||
{ type: 'divider' as const },
|
||||
]
|
||||
: []),
|
||||
{
|
||||
danger: true,
|
||||
icon: <Icon icon={Trash} />,
|
||||
@@ -163,7 +168,7 @@ const Header = memo(() => {
|
||||
onClick: handleDelete,
|
||||
},
|
||||
],
|
||||
[handlePublishClick, handleDelete, t],
|
||||
[canPublishToCommunity, handlePublishClick, handleDelete, t],
|
||||
);
|
||||
|
||||
return (
|
||||
@@ -182,7 +187,7 @@ const Header = memo(() => {
|
||||
<DropdownMenu items={menuItems}>
|
||||
<ActionIcon
|
||||
icon={MoreHorizontal}
|
||||
loading={isPublishing || isAuthLoading}
|
||||
loading={canPublishToCommunity && (isPublishing || isAuthLoading)}
|
||||
size={DESKTOP_HEADER_ICON_SMALL_SIZE}
|
||||
/>
|
||||
</DropdownMenu>
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { type AgentState } from '@lobechat/agent-runtime';
|
||||
import { type UIChatMessage } from '@lobechat/types';
|
||||
import debug from 'debug';
|
||||
|
||||
import { type AgentOperationMetadata, type StepResult } from './AgentStateManager';
|
||||
@@ -43,6 +44,17 @@ export interface AgentRuntimeCoordinatorOptions {
|
||||
* Defaults to automatic selection based on Redis availability
|
||||
*/
|
||||
streamEventManager?: IStreamEventManager;
|
||||
/**
|
||||
* Resolve the canonical UIChatMessage[] snapshot for a terminal-state
|
||||
* agent run, attached to `agent_runtime_end` events so the client can
|
||||
* use the pushed payload as Source of Truth instead of refetching from
|
||||
* DB.
|
||||
*
|
||||
* Optional: when omitted (e.g. tests, embedded usage without DB access)
|
||||
* the coordinator falls back to publishing without `uiMessages` and the
|
||||
* client behaves as before.
|
||||
*/
|
||||
uiMessagesResolver?: (state: AgentState) => Promise<UIChatMessage[] | undefined>;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -59,10 +71,12 @@ export interface AgentRuntimeCoordinatorOptions {
|
||||
export class AgentRuntimeCoordinator {
|
||||
private stateManager: IAgentStateManager;
|
||||
private streamEventManager: IStreamEventManager;
|
||||
private uiMessagesResolver?: (state: AgentState) => Promise<UIChatMessage[] | undefined>;
|
||||
|
||||
constructor(options?: AgentRuntimeCoordinatorOptions) {
|
||||
this.stateManager = options?.stateManager ?? createAgentStateManager();
|
||||
this.streamEventManager = options?.streamEventManager ?? createStreamEventManager();
|
||||
this.uiMessagesResolver = options?.uiMessagesResolver;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -94,6 +108,22 @@ export class AgentRuntimeCoordinator {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the optional uiMessagesResolver and shield callers from its
|
||||
* failures — stream-event publishing must never fail the surrounding
|
||||
* save. Errors are logged and surfaced to the client as a missing field,
|
||||
* which falls back to the legacy refresh path.
|
||||
*/
|
||||
private async resolveUiMessages(state: AgentState): Promise<UIChatMessage[] | undefined> {
|
||||
if (!this.uiMessagesResolver) return undefined;
|
||||
try {
|
||||
return await this.uiMessagesResolver(state);
|
||||
} catch (error) {
|
||||
console.error('Failed to resolve uiMessages for agent_runtime_end:', error);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save Agent state and handle corresponding events
|
||||
*/
|
||||
@@ -106,12 +136,13 @@ export class AgentRuntimeCoordinator {
|
||||
|
||||
// Send a terminal event once the operation first enters a terminal state.
|
||||
if (hasEnteredStreamEndState(previousState?.status, state.status)) {
|
||||
await this.streamEventManager.publishAgentRuntimeEnd(
|
||||
await this.streamEventManager.publishAgentRuntimeEnd({
|
||||
finalState: state,
|
||||
operationId,
|
||||
state.stepCount ?? previousState?.stepCount ?? 0,
|
||||
state,
|
||||
state.status,
|
||||
);
|
||||
reason: state.status,
|
||||
stepIndex: state.stepCount ?? previousState?.stepCount ?? 0,
|
||||
uiMessages: await this.resolveUiMessages(state),
|
||||
});
|
||||
log('[%s] Agent runtime reached terminal state: %s', operationId, state.status);
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -133,12 +164,14 @@ export class AgentRuntimeCoordinator {
|
||||
|
||||
// This ensures agent_runtime_end is sent after all step events.
|
||||
if (hasEnteredStreamEndState(previousState?.status, stepResult.newState.status)) {
|
||||
await this.streamEventManager.publishAgentRuntimeEnd(
|
||||
await this.streamEventManager.publishAgentRuntimeEnd({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
|
||||
stepResult.newState,
|
||||
stepResult.newState.status,
|
||||
);
|
||||
reason: stepResult.newState.status,
|
||||
stepIndex:
|
||||
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
|
||||
uiMessages: await this.resolveUiMessages(stepResult.newState),
|
||||
});
|
||||
log(
|
||||
'[%s] Agent runtime reached terminal state after step result: %s',
|
||||
operationId,
|
||||
|
||||
@@ -7,7 +7,7 @@ import {
|
||||
type StreamChunkData,
|
||||
type StreamEvent,
|
||||
} from './StreamEventManager';
|
||||
import type { IStreamEventManager } from './types';
|
||||
import type { IStreamEventManager, PublishAgentRuntimeEndParams } from './types';
|
||||
|
||||
const log = debug('lobe-server:agent-runtime:gateway-notifier');
|
||||
|
||||
@@ -78,26 +78,25 @@ export class GatewayStreamNotifier implements IStreamEventManager {
|
||||
return result;
|
||||
}
|
||||
|
||||
async publishAgentRuntimeEnd(
|
||||
operationId: string,
|
||||
stepIndex: number,
|
||||
finalState: any,
|
||||
reason?: string,
|
||||
reasonDetail?: string,
|
||||
): Promise<string> {
|
||||
const result = await this.inner.publishAgentRuntimeEnd(
|
||||
operationId,
|
||||
stepIndex,
|
||||
finalState,
|
||||
reason,
|
||||
reasonDetail,
|
||||
);
|
||||
async publishAgentRuntimeEnd(params: PublishAgentRuntimeEndParams): Promise<string> {
|
||||
const { operationId, stepIndex, finalState, reason, reasonDetail, uiMessages } = params;
|
||||
const result = await this.inner.publishAgentRuntimeEnd(params);
|
||||
|
||||
const effectiveReasonDetail = reasonDetail || getDefaultReasonDetail(finalState, reason);
|
||||
const errorType = finalState?.error?.type || finalState?.error?.errorType;
|
||||
|
||||
this.pushEvent(operationId, {
|
||||
data: { errorType, finalState, reason, reasonDetail: effectiveReasonDetail },
|
||||
// Forward `uiMessages` to the gateway push channel so terminal-state
|
||||
// clients consuming /push-event get the canonical UIChatMessage[]
|
||||
// snapshot — the final step has no later step_start to carry a fresh
|
||||
// snapshot, so dropping it here would break the SoT contract.
|
||||
data: {
|
||||
errorType,
|
||||
finalState,
|
||||
reason,
|
||||
reasonDetail: effectiveReasonDetail,
|
||||
...(uiMessages !== undefined && { uiMessages }),
|
||||
},
|
||||
operationId,
|
||||
stepIndex,
|
||||
timestamp: Date.now(),
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import debug from 'debug';
|
||||
|
||||
import { type StreamChunkData, type StreamEvent } from './StreamEventManager';
|
||||
import { type IStreamEventManager } from './types';
|
||||
import { type IStreamEventManager, type PublishAgentRuntimeEndParams } from './types';
|
||||
|
||||
const log = debug('lobe-server:agent-runtime:in-memory-stream-event-manager');
|
||||
|
||||
@@ -97,13 +97,14 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
|
||||
});
|
||||
}
|
||||
|
||||
async publishAgentRuntimeEnd(
|
||||
operationId: string,
|
||||
stepIndex: number,
|
||||
finalState: any,
|
||||
reason?: string,
|
||||
reasonDetail?: string,
|
||||
): Promise<string> {
|
||||
async publishAgentRuntimeEnd({
|
||||
operationId,
|
||||
stepIndex,
|
||||
finalState,
|
||||
reason,
|
||||
reasonDetail,
|
||||
uiMessages,
|
||||
}: PublishAgentRuntimeEndParams): Promise<string> {
|
||||
return this.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
finalState,
|
||||
@@ -111,6 +112,7 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
|
||||
phase: 'execution_complete',
|
||||
reason: reason || 'completed',
|
||||
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
|
||||
...(uiMessages !== undefined && { uiMessages }),
|
||||
},
|
||||
stepIndex,
|
||||
type: 'agent_runtime_end',
|
||||
|
||||
@@ -272,6 +272,14 @@ export interface RuntimeExecutorContext {
|
||||
streamManager: IStreamEventManager;
|
||||
toolExecutionService: ToolExecutionService;
|
||||
topicId?: string;
|
||||
/**
|
||||
* Trace-pipeline sink for context engine input/output. Wired by
|
||||
* AgentRuntimeService so the trace recorder can pick CE data up
|
||||
* out-of-band, keeping the heavy CE payload (agentDocuments, systemRole, …)
|
||||
* out of the `events` array and therefore out of the Redis state pipeline.
|
||||
* See LOBE-9110.
|
||||
*/
|
||||
tracingContextEngine?: (input: unknown, output: unknown) => void;
|
||||
userId?: string;
|
||||
userTimezone?: string;
|
||||
}
|
||||
@@ -714,7 +722,7 @@ export const createRuntimeExecutors = (
|
||||
|
||||
processedMessages = await serverMessagesEngine(contextEngineInput);
|
||||
|
||||
// Emit context engine event for tracing
|
||||
// Hand context engine input/output to the trace sink out-of-band.
|
||||
// Omit large/redundant fields to reduce snapshot size:
|
||||
// - input.messages: reconstructible from step's messagesBaseline + messagesDelta
|
||||
// - input.toolsConfig: static per operation, ~47KB of manifests repeated every call_llm step
|
||||
@@ -724,14 +732,10 @@ export const createRuntimeExecutors = (
|
||||
toolsConfig: _toolsConfig,
|
||||
...contextEngineInputLite
|
||||
} = contextEngineInput;
|
||||
events.push({
|
||||
input: {
|
||||
...contextEngineInputLite,
|
||||
toolCount: _toolsConfig?.tools?.length ?? 0,
|
||||
},
|
||||
output: processedMessages,
|
||||
type: 'context_engine_result',
|
||||
} as any);
|
||||
ctx.tracingContextEngine?.(
|
||||
{ ...contextEngineInputLite, toolCount: _toolsConfig?.tools?.length ?? 0 },
|
||||
processedMessages,
|
||||
);
|
||||
} else {
|
||||
processedMessages = llmPayload.messages;
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import debug from 'debug';
|
||||
import { type Redis } from 'ioredis';
|
||||
|
||||
import { getAgentRuntimeRedisClient } from './redis';
|
||||
import { type PublishAgentRuntimeEndParams } from './types';
|
||||
|
||||
const log = debug('lobe-server:agent-runtime:stream-event-manager');
|
||||
const timing = debug('lobe-server:agent-runtime:timing');
|
||||
@@ -182,13 +183,14 @@ export class StreamEventManager {
|
||||
/**
|
||||
* Publish Agent runtime end event
|
||||
*/
|
||||
async publishAgentRuntimeEnd(
|
||||
operationId: string,
|
||||
stepIndex: number,
|
||||
finalState: any,
|
||||
reason?: string,
|
||||
reasonDetail?: string,
|
||||
): Promise<string> {
|
||||
async publishAgentRuntimeEnd({
|
||||
operationId,
|
||||
stepIndex,
|
||||
finalState,
|
||||
reason,
|
||||
reasonDetail,
|
||||
uiMessages,
|
||||
}: PublishAgentRuntimeEndParams): Promise<string> {
|
||||
return this.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
finalState,
|
||||
@@ -196,6 +198,7 @@ export class StreamEventManager {
|
||||
phase: 'execution_complete',
|
||||
reason: reason || 'completed',
|
||||
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
|
||||
...(uiMessages !== undefined && { uiMessages }),
|
||||
},
|
||||
stepIndex,
|
||||
type: 'agent_runtime_end',
|
||||
|
||||
@@ -95,12 +95,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStateManager.saveAgentState).toHaveBeenCalledWith(operationId, newState);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
newState.stepCount,
|
||||
newState,
|
||||
'done',
|
||||
);
|
||||
reason: 'done',
|
||||
stepIndex: newState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status changes to error', async () => {
|
||||
@@ -112,12 +113,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
newState.stepCount,
|
||||
newState,
|
||||
'error',
|
||||
);
|
||||
reason: 'error',
|
||||
stepIndex: newState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should fallback to previous stepCount when terminal state is missing stepCount', async () => {
|
||||
@@ -129,12 +131,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
previousState.stepCount,
|
||||
newState,
|
||||
'error',
|
||||
);
|
||||
reason: 'error',
|
||||
stepIndex: previousState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status changes to interrupted', async () => {
|
||||
@@ -146,12 +149,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
newState.stepCount,
|
||||
newState,
|
||||
'interrupted',
|
||||
);
|
||||
reason: 'interrupted',
|
||||
stepIndex: newState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status changes to waiting_for_human so the client releases its loading state', async () => {
|
||||
@@ -163,12 +167,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
newState.stepCount,
|
||||
newState,
|
||||
'waiting_for_human',
|
||||
);
|
||||
reason: 'waiting_for_human',
|
||||
stepIndex: newState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should not publish end event when status was already done', async () => {
|
||||
@@ -214,12 +219,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
expect(mockStateManager.loadAgentState).toHaveBeenCalledWith(operationId);
|
||||
expect(mockStateManager.saveStepResult).toHaveBeenCalledWith(operationId, stepResult);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
5,
|
||||
stepResult.newState,
|
||||
'done',
|
||||
);
|
||||
reason: 'done',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status becomes error', async () => {
|
||||
@@ -234,12 +240,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
5,
|
||||
stepResult.newState,
|
||||
'error',
|
||||
);
|
||||
reason: 'error',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should fallback to stepResult.stepIndex when terminal step result state is missing stepCount', async () => {
|
||||
@@ -254,12 +261,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
stepResult.stepIndex,
|
||||
stepResult.newState,
|
||||
'error',
|
||||
);
|
||||
reason: 'error',
|
||||
stepIndex: stepResult.stepIndex,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status becomes waiting_for_human (paused awaiting approval)', async () => {
|
||||
@@ -274,12 +282,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
4,
|
||||
stepResult.newState,
|
||||
'waiting_for_human',
|
||||
);
|
||||
reason: 'waiting_for_human',
|
||||
stepIndex: 4,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status becomes interrupted', async () => {
|
||||
@@ -294,12 +303,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
5,
|
||||
stepResult.newState,
|
||||
'interrupted',
|
||||
);
|
||||
reason: 'interrupted',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should not publish end event when status is not done', async () => {
|
||||
@@ -395,4 +405,105 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
expect(result).toBe(expectedHistory);
|
||||
});
|
||||
});
|
||||
|
||||
// Terminal events should carry the canonical UIChatMessage[] snapshot
|
||||
// when a resolver is wired so the client can use the pushed payload as
|
||||
// Source of Truth instead of refetching from DB.
|
||||
describe('uiMessagesResolver on agent_runtime_end', () => {
|
||||
it('passes resolver result through saveAgentState terminal publish', async () => {
|
||||
const uiMessages = [{ id: 'msg_1', role: 'user' }] as any[];
|
||||
const resolver = vi.fn().mockResolvedValue(uiMessages);
|
||||
const coordinatorWithResolver = new AgentRuntimeCoordinator({
|
||||
stateManager: mockStateManager,
|
||||
streamEventManager: mockStreamManager,
|
||||
uiMessagesResolver: resolver,
|
||||
});
|
||||
|
||||
const previousState = { status: 'running', stepCount: 3 };
|
||||
const newState = { status: 'done', stepCount: 5 };
|
||||
mockStateManager.loadAgentState.mockResolvedValue(previousState);
|
||||
|
||||
await coordinatorWithResolver.saveAgentState('op-1', newState as any);
|
||||
|
||||
expect(resolver).toHaveBeenCalledWith(newState);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId: 'op-1',
|
||||
reason: 'done',
|
||||
stepIndex: 5,
|
||||
uiMessages,
|
||||
});
|
||||
});
|
||||
|
||||
it('passes resolver result through saveStepResult terminal publish', async () => {
|
||||
const uiMessages = [{ id: 'msg_a', role: 'assistantGroup' }] as any[];
|
||||
const resolver = vi.fn().mockResolvedValue(uiMessages);
|
||||
const coordinatorWithResolver = new AgentRuntimeCoordinator({
|
||||
stateManager: mockStateManager,
|
||||
streamEventManager: mockStreamManager,
|
||||
uiMessagesResolver: resolver,
|
||||
});
|
||||
|
||||
const stepResult = {
|
||||
executionTime: 100,
|
||||
newState: { status: 'done', stepCount: 4 },
|
||||
stepIndex: 4,
|
||||
};
|
||||
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 3 });
|
||||
|
||||
await coordinatorWithResolver.saveStepResult('op-2', stepResult as any);
|
||||
|
||||
expect(resolver).toHaveBeenCalledWith(stepResult.newState);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId: 'op-2',
|
||||
reason: 'done',
|
||||
stepIndex: 4,
|
||||
uiMessages,
|
||||
});
|
||||
});
|
||||
|
||||
it('publishes with uiMessages=undefined when resolver rejects (must never fail the surrounding save)', async () => {
|
||||
const resolver = vi.fn().mockRejectedValue(new Error('db down'));
|
||||
const coordinatorWithResolver = new AgentRuntimeCoordinator({
|
||||
stateManager: mockStateManager,
|
||||
streamEventManager: mockStreamManager,
|
||||
uiMessagesResolver: resolver,
|
||||
});
|
||||
|
||||
const previousState = { status: 'running', stepCount: 3 };
|
||||
const newState = { status: 'error', stepCount: 5 };
|
||||
mockStateManager.loadAgentState.mockResolvedValue(previousState);
|
||||
|
||||
await coordinatorWithResolver.saveAgentState('op-3', newState as any);
|
||||
|
||||
expect(resolver).toHaveBeenCalled();
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId: 'op-3',
|
||||
reason: 'error',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('publishes with uiMessages=undefined when no resolver is wired (default constructor)', async () => {
|
||||
// The default `coordinator` from the outer beforeEach is constructed
|
||||
// without a resolver — proves the field is genuinely optional and
|
||||
// legacy call sites stay unaffected.
|
||||
const previousState = { status: 'running', stepCount: 3 };
|
||||
const newState = { status: 'done', stepCount: 5 };
|
||||
mockStateManager.loadAgentState.mockResolvedValue(previousState);
|
||||
|
||||
await coordinator.saveAgentState('op-4', newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId: 'op-4',
|
||||
reason: 'done',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -131,21 +131,27 @@ describe('GatewayStreamNotifier', () => {
|
||||
it('delegates to inner and returns its result', async () => {
|
||||
const finalState = { status: 'done' };
|
||||
|
||||
const result = await notifier.publishAgentRuntimeEnd('op-1', 2, finalState, 'completed');
|
||||
const params = {
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 2,
|
||||
};
|
||||
const result = await notifier.publishAgentRuntimeEnd(params);
|
||||
|
||||
expect(result).toBe('publishAgentRuntimeEnd-result');
|
||||
expect(inner.calls.publishAgentRuntimeEnd).toHaveLength(1);
|
||||
expect(inner.calls.publishAgentRuntimeEnd[0]).toEqual([
|
||||
'op-1',
|
||||
2,
|
||||
finalState,
|
||||
'completed',
|
||||
undefined,
|
||||
]);
|
||||
expect(inner.calls.publishAgentRuntimeEnd[0]).toEqual([params]);
|
||||
});
|
||||
|
||||
it('calls gateway push-event endpoint only (no update-status)', async () => {
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 2, {}, 'completed', 'All done');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState: {},
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
reasonDetail: 'All done',
|
||||
stepIndex: 2,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
@@ -163,7 +169,12 @@ describe('GatewayStreamNotifier', () => {
|
||||
},
|
||||
};
|
||||
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'error',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
@@ -176,7 +187,13 @@ describe('GatewayStreamNotifier', () => {
|
||||
error: { message: 'Some error' },
|
||||
};
|
||||
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error', 'Custom detail');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'error',
|
||||
reasonDetail: 'Custom detail',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
@@ -189,7 +206,12 @@ describe('GatewayStreamNotifier', () => {
|
||||
error: { message: 'Budget exceeded', type: 'InsufficientBudgetForModel' },
|
||||
};
|
||||
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'error',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
@@ -205,7 +227,12 @@ describe('GatewayStreamNotifier', () => {
|
||||
},
|
||||
};
|
||||
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'error',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
@@ -214,13 +241,49 @@ describe('GatewayStreamNotifier', () => {
|
||||
});
|
||||
|
||||
it('errorType is undefined when no error in finalState', async () => {
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, { status: 'done' }, 'completed');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState: { status: 'done' },
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
const body = JSON.parse(pushCall![1].body);
|
||||
expect(body.event.data.errorType).toBeUndefined();
|
||||
});
|
||||
|
||||
it('forwards uiMessages to the gateway push payload when provided', async () => {
|
||||
const uiMessages = [{ id: 'msg_z', role: 'assistantGroup' }] as any;
|
||||
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState: { status: 'done' },
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 4,
|
||||
uiMessages,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
const body = JSON.parse(pushCall![1].body);
|
||||
expect(body.event.data.uiMessages).toEqual(uiMessages);
|
||||
});
|
||||
|
||||
it('omits uiMessages from the gateway push payload when not provided', async () => {
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState: { status: 'done' },
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 4,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
const body = JSON.parse(pushCall![1].body);
|
||||
expect(body.event.data).not.toHaveProperty('uiMessages');
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Read/subscribe methods: must delegate directly to inner ───
|
||||
@@ -306,7 +369,12 @@ describe('GatewayStreamNotifier', () => {
|
||||
() => new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), 10)),
|
||||
);
|
||||
|
||||
const result = await notifier.publishAgentRuntimeEnd('op-1', 0, {}, 'completed');
|
||||
const result = await notifier.publishAgentRuntimeEnd({
|
||||
finalState: {},
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 0,
|
||||
});
|
||||
|
||||
expect(result).toBe('publishAgentRuntimeEnd-result');
|
||||
expect(inner.calls.publishAgentRuntimeEnd).toHaveLength(1);
|
||||
|
||||
@@ -157,4 +157,46 @@ describe('InMemoryStreamEventManager', () => {
|
||||
expect(manager.getAllEvents('op-1')).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
// agent_runtime_end optionally carries the canonical UIChatMessage[]
|
||||
// snapshot so the client can use the pushed payload as Source of Truth
|
||||
// instead of refetching from DB.
|
||||
describe('publishAgentRuntimeEnd uiMessages', () => {
|
||||
it('includes uiMessages in event data when provided', async () => {
|
||||
const uiMessages = [
|
||||
{ id: 'msg_u', role: 'user' },
|
||||
{ id: 'msg_a', role: 'assistantGroup' },
|
||||
] as any[];
|
||||
const finalState = { status: 'done', stepCount: 3 };
|
||||
|
||||
await manager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'done',
|
||||
stepIndex: 3,
|
||||
uiMessages,
|
||||
});
|
||||
|
||||
const events = manager.getAllEvents('op-1');
|
||||
expect(events).toHaveLength(1);
|
||||
expect(events[0].type).toBe('agent_runtime_end');
|
||||
expect(events[0].data.uiMessages).toEqual(uiMessages);
|
||||
expect(events[0].data.finalState).toEqual(finalState);
|
||||
});
|
||||
|
||||
it('omits uiMessages when not provided (legacy callers stay unaffected)', async () => {
|
||||
const finalState = { status: 'done', stepCount: 3 };
|
||||
|
||||
await manager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'done',
|
||||
stepIndex: 3,
|
||||
});
|
||||
|
||||
const events = manager.getAllEvents('op-1');
|
||||
expect(events[0].data).not.toHaveProperty('uiMessages');
|
||||
expect(events[0].data.finalState).toEqual(finalState);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -75,7 +75,11 @@ describe('StreamEventManager', () => {
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-456');
|
||||
|
||||
const result = await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState);
|
||||
const result = await streamManager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId,
|
||||
stepIndex,
|
||||
});
|
||||
|
||||
expect(result).toBe('event-id-456');
|
||||
expect(mockRedis.xadd).toHaveBeenCalledWith(
|
||||
@@ -103,6 +107,50 @@ describe('StreamEventManager', () => {
|
||||
);
|
||||
});
|
||||
|
||||
// agent_runtime_end optionally carries the canonical UIChatMessage[]
|
||||
// snapshot so the client can use the pushed payload as Source of Truth
|
||||
// instead of refetching from DB.
|
||||
it('should include uiMessages in serialized data when provided', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const stepIndex = 5;
|
||||
const finalState = { status: 'done', stepCount: 5 };
|
||||
const uiMessages = [{ id: 'msg_a', role: 'assistantGroup' }] as any;
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-ui');
|
||||
|
||||
await streamManager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId,
|
||||
reason: 'done',
|
||||
stepIndex,
|
||||
uiMessages,
|
||||
});
|
||||
|
||||
// Find the serialized `data` argument inline so this test stays robust
|
||||
// if other positional args shift around.
|
||||
const dataArg = mockRedis.xadd.mock.calls[0]?.find(
|
||||
(a: any) => typeof a === 'string' && a.startsWith('{'),
|
||||
);
|
||||
const parsed = JSON.parse(dataArg);
|
||||
expect(parsed.uiMessages).toEqual(uiMessages);
|
||||
expect(parsed.finalState).toEqual(finalState);
|
||||
});
|
||||
|
||||
it('should omit uiMessages from serialized data when not provided', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const finalState = { status: 'done', stepCount: 3 };
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-noui');
|
||||
|
||||
await streamManager.publishAgentRuntimeEnd({ finalState, operationId, stepIndex: 3 });
|
||||
|
||||
const dataArg = mockRedis.xadd.mock.calls[0]?.find(
|
||||
(a: any) => typeof a === 'string' && a.startsWith('{'),
|
||||
);
|
||||
const parsed = JSON.parse(dataArg);
|
||||
expect(parsed).not.toHaveProperty('uiMessages');
|
||||
});
|
||||
|
||||
it('should accept custom reason and reasonDetail', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const stepIndex = 3;
|
||||
@@ -112,13 +160,13 @@ describe('StreamEventManager', () => {
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-789');
|
||||
|
||||
await streamManager.publishAgentRuntimeEnd(
|
||||
operationId,
|
||||
stepIndex,
|
||||
await streamManager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId,
|
||||
reason,
|
||||
reasonDetail,
|
||||
);
|
||||
stepIndex,
|
||||
});
|
||||
|
||||
expect(mockRedis.xadd).toHaveBeenCalledWith(
|
||||
expect.any(String),
|
||||
@@ -158,7 +206,12 @@ describe('StreamEventManager', () => {
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-790');
|
||||
|
||||
await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState, 'error');
|
||||
await streamManager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId,
|
||||
reason: 'error',
|
||||
stepIndex,
|
||||
});
|
||||
|
||||
expect(mockRedis.xadd).toHaveBeenCalledWith(
|
||||
expect.any(String),
|
||||
|
||||
@@ -1,9 +1,24 @@
|
||||
import type { ToolExecuteData } from '@lobechat/agent-gateway-client';
|
||||
import { type AgentState } from '@lobechat/agent-runtime';
|
||||
import { type UIChatMessage } from '@lobechat/types';
|
||||
|
||||
import { type AgentOperationMetadata, type StepResult } from './AgentStateManager';
|
||||
import { type StreamChunkData, type StreamEvent } from './StreamEventManager';
|
||||
|
||||
export interface PublishAgentRuntimeEndParams {
|
||||
finalState: any;
|
||||
operationId: string;
|
||||
reason?: string;
|
||||
reasonDetail?: string;
|
||||
stepIndex: number;
|
||||
/**
|
||||
* Canonical UIChatMessage[] snapshot of the topic at terminal-state time.
|
||||
* When present, the client uses this directly as Source of Truth instead
|
||||
* of refetching from DB.
|
||||
*/
|
||||
uiMessages?: UIChatMessage[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Agent State Manager Interface
|
||||
* Abstract interface for state persistence, supports Redis and in-memory implementations
|
||||
@@ -114,15 +129,14 @@ export interface IStreamEventManager {
|
||||
getStreamHistory: (operationId: string, count?: number) => Promise<StreamEvent[]>;
|
||||
|
||||
/**
|
||||
* Publish Agent runtime end event
|
||||
* Publish Agent runtime end event.
|
||||
*
|
||||
* `uiMessages` is the canonical UIChatMessage[] snapshot of the topic at
|
||||
* terminal-state time so the client can use the pushed payload as Source
|
||||
* of Truth instead of refetching from DB. Optional: callers without DB
|
||||
* access may omit it and the client falls back to its existing behaviour.
|
||||
*/
|
||||
publishAgentRuntimeEnd: (
|
||||
operationId: string,
|
||||
stepIndex: number,
|
||||
finalState: any,
|
||||
reason?: string,
|
||||
reasonDetail?: string,
|
||||
) => Promise<string>;
|
||||
publishAgentRuntimeEnd: (params: PublishAgentRuntimeEndParams) => Promise<string>;
|
||||
|
||||
/**
|
||||
* Publish Agent runtime initialization event
|
||||
|
||||
@@ -132,13 +132,13 @@ export const agentNotifyRouter = router({
|
||||
const stream = getStreamManager();
|
||||
if (done) {
|
||||
// Signal task completion — frontend gateway WS subscription closes.
|
||||
await stream.publishAgentRuntimeEnd(
|
||||
remoteOperationId,
|
||||
0,
|
||||
{ reason: 'success' },
|
||||
'success',
|
||||
'Remote hetero agent task completed',
|
||||
);
|
||||
await stream.publishAgentRuntimeEnd({
|
||||
finalState: { reason: 'success' },
|
||||
operationId: remoteOperationId,
|
||||
reason: 'success',
|
||||
reasonDetail: 'Remote hetero agent task completed',
|
||||
stepIndex: 0,
|
||||
});
|
||||
} else {
|
||||
// Lightweight invalidation — frontend calls fetchAndReplaceMessages.
|
||||
await stream.publishStreamEvent(remoteOperationId, {
|
||||
|
||||
@@ -1478,4 +1478,55 @@ describe('AgentRuntimeService', () => {
|
||||
expect(mockCoordinator.saveAgentState).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
// Stream events at step / operation boundaries should carry the canonical
|
||||
// UIChatMessage[] snapshot so the client can use the pushed payload as
|
||||
// Source of Truth instead of refetching from DB.
|
||||
describe('queryUiMessages', () => {
|
||||
const stubMessageService = (svc: any, queryMessages: ReturnType<typeof vi.fn>) => {
|
||||
svc.messageServiceInstance = { queryMessages };
|
||||
};
|
||||
|
||||
it('returns messageService.queryMessages result when agentId + topicId are present', async () => {
|
||||
const stubMessages = [{ id: 'msg_x', role: 'user' }];
|
||||
const queryMessages = vi.fn().mockResolvedValue(stubMessages);
|
||||
stubMessageService(service, queryMessages);
|
||||
|
||||
const result = await service.queryUiMessages({
|
||||
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
|
||||
} as any);
|
||||
|
||||
expect(queryMessages).toHaveBeenCalledWith({ agentId: 'agt_1', topicId: 'tpc_1' });
|
||||
expect(result).toEqual(stubMessages);
|
||||
});
|
||||
|
||||
it('returns undefined when agentId or topicId is missing (skips empty-array push)', async () => {
|
||||
const queryMessages = vi.fn();
|
||||
stubMessageService(service, queryMessages);
|
||||
|
||||
const noAgent = await service.queryUiMessages({
|
||||
metadata: { topicId: 'tpc_1' },
|
||||
} as any);
|
||||
const noTopic = await service.queryUiMessages({
|
||||
metadata: { agentId: 'agt_1' },
|
||||
} as any);
|
||||
const noMeta = await service.queryUiMessages({} as any);
|
||||
|
||||
expect(noAgent).toBeUndefined();
|
||||
expect(noTopic).toBeUndefined();
|
||||
expect(noMeta).toBeUndefined();
|
||||
expect(queryMessages).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns undefined and never throws when DB query fails (stream events must not fail the step)', async () => {
|
||||
const queryMessages = vi.fn().mockRejectedValue(new Error('db down'));
|
||||
stubMessageService(service, queryMessages);
|
||||
|
||||
const result = await service.queryUiMessages({
|
||||
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
|
||||
} as any);
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -13,6 +13,7 @@ import {
|
||||
ChatErrorType,
|
||||
type ChatMessageError,
|
||||
type ExecSubAgentTaskParams,
|
||||
type UIChatMessage,
|
||||
} from '@lobechat/types';
|
||||
import debug from 'debug';
|
||||
import urlJoin from 'url-join';
|
||||
@@ -30,6 +31,7 @@ import { type IStreamEventManager } from '@/server/modules/AgentRuntime/types';
|
||||
import { emitAgentSignalSourceEvent } from '@/server/services/agentSignal';
|
||||
import { toAgentSignalTraceEvents } from '@/server/services/agentSignal/observability/traceEvents';
|
||||
import { mcpService } from '@/server/services/mcp';
|
||||
import { MessageService } from '@/server/services/message';
|
||||
import { QueueService } from '@/server/services/queue';
|
||||
import { LocalQueueServiceImpl } from '@/server/services/queue/impls';
|
||||
import { ToolExecutionService } from '@/server/services/toolExecution';
|
||||
@@ -182,6 +184,17 @@ export class AgentRuntimeService {
|
||||
private serverDB: LobeChatDatabase;
|
||||
private userId: string;
|
||||
private messageModel: MessageModel;
|
||||
// Lazily constructed because MessageService instantiates a FileService
|
||||
// which eagerly creates the S3 client and throws when S3 env vars are
|
||||
// missing — eager construction would break every test that builds an
|
||||
// AgentRuntimeService without mocking the file backend.
|
||||
private messageServiceInstance?: MessageService;
|
||||
private get messageService(): MessageService {
|
||||
if (!this.messageServiceInstance) {
|
||||
this.messageServiceInstance = new MessageService(this.serverDB, this.userId);
|
||||
}
|
||||
return this.messageServiceInstance;
|
||||
}
|
||||
|
||||
constructor(db: LobeChatDatabase, userId: string, options?: AgentRuntimeServiceOptions) {
|
||||
// Use factory function to auto-select Redis or InMemory implementation
|
||||
@@ -192,6 +205,10 @@ export class AgentRuntimeService {
|
||||
this.coordinator = new AgentRuntimeCoordinator({
|
||||
...options?.coordinatorOptions,
|
||||
streamEventManager: this.streamManager,
|
||||
// Provide the canonical UIChatMessage[] for terminal-state events so
|
||||
// the client can use the pushed payload directly instead of refetching
|
||||
// from DB. Falls back gracefully when topicId isn't set.
|
||||
uiMessagesResolver: (state) => this.queryUiMessages(state),
|
||||
});
|
||||
this.queueService =
|
||||
options?.queueService === null ? null : (options?.queueService ?? new QueueService());
|
||||
@@ -474,6 +491,31 @@ export class AgentRuntimeService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the canonical UIChatMessage[] snapshot for the active topic — the
|
||||
* same shape the `message.getMessages` trpc lambda returns to the client.
|
||||
* Attached to step_start / agent_runtime_end stream events so the client
|
||||
* can use the pushed payload directly instead of refetching from DB.
|
||||
*
|
||||
* Returns undefined when the topic isn't known yet (e.g. very early in
|
||||
* bootstrap before the topic row has been committed) so callers can skip
|
||||
* the `uiMessages` field entirely instead of pushing an empty array.
|
||||
*/
|
||||
async queryUiMessages(agentState: AgentState): Promise<UIChatMessage[] | undefined> {
|
||||
const agentId: string | undefined = agentState?.metadata?.agentId;
|
||||
const topicId: string | undefined = agentState?.metadata?.topicId;
|
||||
if (!agentId || !topicId) return undefined;
|
||||
|
||||
try {
|
||||
return await this.messageService.queryMessages({ agentId, topicId });
|
||||
} catch (error) {
|
||||
// Stream events must never fail the step. If the DB hiccups, fall back
|
||||
// to letting the client refresh as before.
|
||||
console.error('[queryUiMessages] Failed to load uiMessages snapshot: %O', error);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute Agent step
|
||||
*/
|
||||
@@ -515,20 +557,27 @@ export class AgentRuntimeService {
|
||||
try {
|
||||
log('[%s][%d] Start step executing...', operationId, stepIndex);
|
||||
|
||||
// Publish step start event
|
||||
await this.streamManager.publishStreamEvent(operationId, {
|
||||
data: {},
|
||||
stepIndex,
|
||||
type: 'step_start',
|
||||
});
|
||||
|
||||
// Get operation state and metadata
|
||||
// Load agent state BEFORE publishing step_start so we can attach the
|
||||
// canonical UIChatMessage snapshot to the event payload. step_start
|
||||
// fires after the previous step's DB writes are awaited durable, so
|
||||
// the snapshot query here reflects strongly-consistent state — that's
|
||||
// the contract that lets the client treat the pushed uiMessages as
|
||||
// the source of truth instead of doing its own refetch.
|
||||
const agentState = await this.coordinator.loadAgentState(operationId);
|
||||
|
||||
if (!agentState) {
|
||||
throw new Error(`Agent state not found for operation ${operationId}`);
|
||||
}
|
||||
|
||||
const stepStartUiMessages = await this.queryUiMessages(agentState);
|
||||
await this.streamManager.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
...(stepStartUiMessages !== undefined && { uiMessages: stepStartUiMessages }),
|
||||
},
|
||||
stepIndex,
|
||||
type: 'step_start',
|
||||
});
|
||||
|
||||
agentState.metadata = {
|
||||
...agentState.metadata,
|
||||
externalRetryCount,
|
||||
@@ -622,6 +671,14 @@ export class AgentRuntimeService {
|
||||
log('[%s] beforeStep hook dispatch error: %O', operationId, hookError);
|
||||
}
|
||||
|
||||
// Per-step buffer for context engine input/output. Populated by the
|
||||
// `tracingContextEngine` callback passed into the executor context;
|
||||
// consumed by traceRecorder.appendStep below. Routing CE this way keeps
|
||||
// its heavy payload (agentDocuments, systemRole, …) out of
|
||||
// `stepResult.events` and therefore out of the Redis state pipeline.
|
||||
// See LOBE-9110.
|
||||
let contextEnginePayload: { input: unknown; output: unknown } | undefined;
|
||||
|
||||
// Create Agent and Runtime instances
|
||||
// Use agentState.metadata which contains the full app context (topicId, agentId, etc.)
|
||||
// operationMetadata only contains basic fields (agentConfig, modelRuntimeConfig, userId)
|
||||
@@ -629,6 +686,9 @@ export class AgentRuntimeService {
|
||||
metadata: agentState?.metadata,
|
||||
operationId,
|
||||
stepIndex,
|
||||
tracingContextEngine: (input, output) => {
|
||||
contextEnginePayload = { input, output };
|
||||
},
|
||||
});
|
||||
|
||||
// Handle human intervention
|
||||
@@ -800,6 +860,7 @@ export class AgentRuntimeService {
|
||||
afterStepSignalEvents,
|
||||
agentState,
|
||||
beforeStepSignalEvents,
|
||||
contextEngine: contextEnginePayload,
|
||||
currentContext,
|
||||
externalRetryCount,
|
||||
presentation: stepPresentationData,
|
||||
@@ -1337,10 +1398,12 @@ export class AgentRuntimeService {
|
||||
metadata,
|
||||
operationId,
|
||||
stepIndex,
|
||||
tracingContextEngine,
|
||||
}: {
|
||||
metadata?: any;
|
||||
operationId: string;
|
||||
stepIndex: number;
|
||||
tracingContextEngine?: (input: unknown, output: unknown) => void;
|
||||
}) {
|
||||
const contextWindowTokens =
|
||||
metadata?.modelRuntimeConfig?.model && metadata?.modelRuntimeConfig?.provider
|
||||
@@ -1386,6 +1449,7 @@ export class AgentRuntimeService {
|
||||
streamManager: this.streamManager,
|
||||
toolExecutionService: this.toolExecutionService,
|
||||
topicId: metadata?.topicId,
|
||||
tracingContextEngine,
|
||||
userId: metadata?.userId,
|
||||
};
|
||||
|
||||
|
||||
@@ -15,6 +15,13 @@ export interface AppendStepParams {
|
||||
*/
|
||||
agentState: any;
|
||||
beforeStepSignalEvents: SignalEvent[];
|
||||
/**
|
||||
* Context engine input/output captured for this step. Delivered via
|
||||
* `RuntimeExecutorContext.tracingContextEngine` rather than through the
|
||||
* `events` array, so CE payloads (agentDocuments, systemRole, …) stay out
|
||||
* of the Redis state pipeline. See LOBE-9110.
|
||||
*/
|
||||
contextEngine?: { input?: unknown; output?: unknown };
|
||||
currentContext?: { payload?: unknown; phase?: string; stepContext?: unknown };
|
||||
externalRetryCount: number;
|
||||
presentation: StepPresentationData;
|
||||
@@ -220,6 +227,7 @@ export class OperationTraceRecorder {
|
||||
agentState,
|
||||
afterStepSignalEvents,
|
||||
beforeStepSignalEvents,
|
||||
contextEngine: ceInput,
|
||||
currentContext,
|
||||
externalRetryCount,
|
||||
presentation,
|
||||
@@ -237,21 +245,20 @@ export class OperationTraceRecorder {
|
||||
const isBaseline = stepIndex === 0 || isCompression;
|
||||
const messagesDelta = afterMessages.slice(prevMessages.length);
|
||||
|
||||
// Extract context_engine_result into contextEngine (dedicated typed field).
|
||||
// CE data is structural state, not a streaming event — it lives separately
|
||||
// from events and uses the same delta pattern as messagesBaseline/messagesDelta.
|
||||
const rawEvents = (stepResult.events as any[]) ?? [];
|
||||
const ceEvent = rawEvents.find((e: any) => e.type === 'context_engine_result') as any;
|
||||
const contextEngine: StepSnapshot['contextEngine'] = ceEvent
|
||||
? { input: ceEvent.input, output: ceEvent.output }
|
||||
// CE data is structural state, not a streaming event — delivered via the
|
||||
// typed `contextEngine` field on AppendStepParams (sourced from
|
||||
// RuntimeExecutorContext.tracingContextEngine). Uses the same delta
|
||||
// pattern as messagesBaseline/messagesDelta.
|
||||
const contextEngine: StepSnapshot['contextEngine'] = ceInput
|
||||
? { input: ceInput.input, output: ceInput.output }
|
||||
: undefined;
|
||||
|
||||
// Strip heavy/redundant data from events before persisting to snapshot.
|
||||
// context_engine_result is excluded — stored in contextEngine instead.
|
||||
const rawEvents = (stepResult.events as any[]) ?? [];
|
||||
const snapshotEvents = [
|
||||
...beforeStepSignalEvents,
|
||||
...rawEvents
|
||||
.filter((e) => e.type !== 'llm_stream' && e.type !== 'context_engine_result')
|
||||
.filter((e) => e.type !== 'llm_stream')
|
||||
.map((e) => {
|
||||
if (e.type === 'done' && e.finalState) {
|
||||
// Remove reconstructible fields from finalState:
|
||||
|
||||
@@ -358,26 +358,29 @@ describe('OperationTraceRecorder', () => {
|
||||
recorder = new OperationTraceRecorder(store as any);
|
||||
});
|
||||
|
||||
const buildCeEvent = (overrides: Record<string, unknown> = {}) => ({
|
||||
const buildCe = (overrides: { input?: unknown; output?: unknown } = {}) => ({
|
||||
input: { messages: ['hello'] },
|
||||
output: { tokens: 42 },
|
||||
type: 'context_engine_result',
|
||||
...overrides,
|
||||
});
|
||||
|
||||
const appendStepWithCe = (ceEvent: Record<string, unknown>, prevStepsInStore: any[]) => {
|
||||
const appendStepWithCe = (
|
||||
ce: { input?: unknown; output?: unknown },
|
||||
prevStepsInStore: any[],
|
||||
) => {
|
||||
store.loadPartial.mockResolvedValue({ startedAt: 1, steps: prevStepsInStore });
|
||||
return recorder.appendStep('op-ce', {
|
||||
afterStepSignalEvents: [],
|
||||
agentState: { messages: [] },
|
||||
beforeStepSignalEvents: [],
|
||||
contextEngine: ce,
|
||||
currentContext: { phase: 'user_input' },
|
||||
externalRetryCount: 0,
|
||||
presentation: buildPresentation(),
|
||||
startedAt: 1000,
|
||||
stepIndex: prevStepsInStore.length,
|
||||
stepResult: {
|
||||
events: [ceEvent],
|
||||
events: [],
|
||||
newState: { activatedStepTools: [], messages: [] },
|
||||
},
|
||||
});
|
||||
@@ -388,8 +391,8 @@ describe('OperationTraceRecorder', () => {
|
||||
return saved.steps.find((s: any) => s.stepIndex === newStepIndex);
|
||||
};
|
||||
|
||||
it('extracts CE event into contextEngine, not in events', async () => {
|
||||
await appendStepWithCe(buildCeEvent(), []);
|
||||
it('routes CE input/output into contextEngine, never into events', async () => {
|
||||
await appendStepWithCe(buildCe(), []);
|
||||
|
||||
const step = getSavedStep(0);
|
||||
// CE data lives in contextEngine, not events
|
||||
@@ -398,7 +401,7 @@ describe('OperationTraceRecorder', () => {
|
||||
});
|
||||
|
||||
it('keeps both input and output on the first step (no previous CE to compare against)', async () => {
|
||||
await appendStepWithCe(buildCeEvent(), []);
|
||||
await appendStepWithCe(buildCe(), []);
|
||||
|
||||
const step = getSavedStep(0);
|
||||
expect(step.contextEngine.input).toEqual({ messages: ['hello'] });
|
||||
@@ -411,7 +414,7 @@ describe('OperationTraceRecorder', () => {
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
};
|
||||
await appendStepWithCe(buildCeEvent(), [prevStep]);
|
||||
await appendStepWithCe(buildCe(), [prevStep]);
|
||||
|
||||
const step = getSavedStep(1);
|
||||
expect(step.contextEngine.input).toBeUndefined();
|
||||
@@ -424,10 +427,9 @@ describe('OperationTraceRecorder', () => {
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
};
|
||||
await appendStepWithCe(
|
||||
buildCeEvent({ input: { messages: ['hello'] }, output: { tokens: 99 } }),
|
||||
[prevStep],
|
||||
);
|
||||
await appendStepWithCe(buildCe({ input: { messages: ['hello'] }, output: { tokens: 99 } }), [
|
||||
prevStep,
|
||||
]);
|
||||
|
||||
const step = getSavedStep(1);
|
||||
expect(step.contextEngine.input).toBeUndefined();
|
||||
@@ -440,10 +442,9 @@ describe('OperationTraceRecorder', () => {
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
};
|
||||
await appendStepWithCe(
|
||||
buildCeEvent({ input: { messages: ['world'] }, output: { tokens: 42 } }),
|
||||
[prevStep],
|
||||
);
|
||||
await appendStepWithCe(buildCe({ input: { messages: ['world'] }, output: { tokens: 42 } }), [
|
||||
prevStep,
|
||||
]);
|
||||
|
||||
const step = getSavedStep(1);
|
||||
expect(step.contextEngine.input).toEqual({ messages: ['world'] });
|
||||
@@ -456,10 +457,9 @@ describe('OperationTraceRecorder', () => {
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
};
|
||||
await appendStepWithCe(
|
||||
buildCeEvent({ input: { messages: ['new'] }, output: { tokens: 20 } }),
|
||||
[prevStep],
|
||||
);
|
||||
await appendStepWithCe(buildCe({ input: { messages: ['new'] }, output: { tokens: 20 } }), [
|
||||
prevStep,
|
||||
]);
|
||||
|
||||
const step = getSavedStep(1);
|
||||
expect(step.contextEngine.input).toEqual({ messages: ['new'] });
|
||||
@@ -476,7 +476,7 @@ describe('OperationTraceRecorder', () => {
|
||||
// step 1 has no contextEngine — dedup must skip it and walk back to step 0
|
||||
{ stepIndex: 1, stepType: 'call_tool' },
|
||||
];
|
||||
await appendStepWithCe(buildCeEvent(), prevSteps);
|
||||
await appendStepWithCe(buildCe(), prevSteps);
|
||||
|
||||
const step = getSavedStep(2);
|
||||
expect(step.contextEngine.input).toBeUndefined();
|
||||
@@ -492,7 +492,7 @@ describe('OperationTraceRecorder', () => {
|
||||
{ contextEngine: { output: { tokens: 42 } }, stepIndex: 1, stepType: 'call_llm' },
|
||||
];
|
||||
await appendStepWithCe(
|
||||
buildCeEvent({ input: { messages: ['hello'] }, output: { tokens: 42 } }),
|
||||
buildCe({ input: { messages: ['hello'] }, output: { tokens: 42 } }),
|
||||
prevSteps,
|
||||
);
|
||||
|
||||
@@ -501,7 +501,7 @@ describe('OperationTraceRecorder', () => {
|
||||
expect(step.contextEngine.output).toBeUndefined();
|
||||
});
|
||||
|
||||
it('sets contextEngine to undefined when the step has no context_engine_result event', async () => {
|
||||
it('sets contextEngine to undefined when no CE was recorded for the step', async () => {
|
||||
const prevStep = {
|
||||
contextEngine: { input: { messages: ['hello'] }, output: { tokens: 42 } },
|
||||
stepIndex: 0,
|
||||
|
||||
@@ -898,3 +898,82 @@ describe('AgentRuntimeService.executeStep - error-path snapshot finalize ()', ()
|
||||
dispatchSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
// step_start event should carry the canonical UIChatMessage[] so the
|
||||
// client can use the pushed payload as Source of Truth.
|
||||
describe('AgentRuntimeService.executeStep - step_start uiMessages payload', () => {
|
||||
const createService = () => {
|
||||
return new AgentRuntimeService({} as any, 'user-1', { queueService: null });
|
||||
};
|
||||
|
||||
it('attaches uiMessages to step_start data when topic context is known', async () => {
|
||||
const service = createService();
|
||||
const coordinator = (service as any).coordinator;
|
||||
const streamManager = (service as any).streamManager;
|
||||
|
||||
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
|
||||
// Force early-exit path so we don't need to mock the entire runtime
|
||||
// execution surface — terminal-state short-circuits right after
|
||||
// step_start publishes.
|
||||
coordinator.loadAgentState = vi.fn().mockResolvedValue({
|
||||
status: 'done',
|
||||
stepCount: 3,
|
||||
lastModified: new Date().toISOString(),
|
||||
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
|
||||
});
|
||||
streamManager.publishStreamEvent = vi.fn().mockResolvedValue(undefined);
|
||||
|
||||
// Inject a uiMessages-returning messageService — the runtime queries
|
||||
// through MessageService (not the bare messageModel) so that file URLs
|
||||
// go through FileService postProcessUrl.
|
||||
const stubMessages = [{ id: 'msg_1', role: 'user' }];
|
||||
(service as any).messageServiceInstance = {
|
||||
queryMessages: vi.fn().mockResolvedValue(stubMessages),
|
||||
};
|
||||
|
||||
await service.executeStep({
|
||||
operationId: 'op-uimsg',
|
||||
stepIndex: 5,
|
||||
context: { phase: 'user_input' } as any,
|
||||
});
|
||||
|
||||
// First publish call is step_start; assert its payload carries uiMessages.
|
||||
const stepStartCall = streamManager.publishStreamEvent.mock.calls.find(
|
||||
([, evt]: any) => evt?.type === 'step_start',
|
||||
);
|
||||
expect(stepStartCall).toBeDefined();
|
||||
expect(stepStartCall[1].data.uiMessages).toEqual(stubMessages);
|
||||
});
|
||||
|
||||
it('omits uiMessages from step_start data when topic context is unknown', async () => {
|
||||
const service = createService();
|
||||
const coordinator = (service as any).coordinator;
|
||||
const streamManager = (service as any).streamManager;
|
||||
|
||||
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
|
||||
coordinator.loadAgentState = vi.fn().mockResolvedValue({
|
||||
status: 'done',
|
||||
stepCount: 3,
|
||||
lastModified: new Date().toISOString(),
|
||||
metadata: {}, // no agentId/topicId
|
||||
});
|
||||
streamManager.publishStreamEvent = vi.fn().mockResolvedValue(undefined);
|
||||
|
||||
const queryMock = vi.fn();
|
||||
(service as any).messageServiceInstance = { queryMessages: queryMock };
|
||||
|
||||
await service.executeStep({
|
||||
operationId: 'op-noctx',
|
||||
stepIndex: 5,
|
||||
context: { phase: 'user_input' } as any,
|
||||
});
|
||||
|
||||
const stepStartCall = streamManager.publishStreamEvent.mock.calls.find(
|
||||
([, evt]: any) => evt?.type === 'step_start',
|
||||
);
|
||||
expect(stepStartCall).toBeDefined();
|
||||
expect(stepStartCall[1].data).not.toHaveProperty('uiMessages');
|
||||
// Did not even attempt the DB query when context is missing.
|
||||
expect(queryMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -256,7 +256,11 @@ describe('AgentRuntimeService.executeSync', () => {
|
||||
|
||||
// Publish event after a small delay
|
||||
setTimeout(async () => {
|
||||
await streamEventManager.publishAgentRuntimeEnd(operationId, 1, { status: 'done' });
|
||||
await streamEventManager.publishAgentRuntimeEnd({
|
||||
finalState: { status: 'done' },
|
||||
operationId,
|
||||
stepIndex: 1,
|
||||
});
|
||||
}, 10);
|
||||
|
||||
const event = await streamEventManager.waitForEvent(operationId, 'agent_runtime_end', 1000);
|
||||
|
||||
@@ -873,7 +873,13 @@ export class AiAgentService {
|
||||
if (!result.success) {
|
||||
log('execAgent: remote hetero dispatch failed: %s', result.error);
|
||||
await streamManager
|
||||
.publishAgentRuntimeEnd(operationId, 0, { error: result.error }, 'error', result.error)
|
||||
.publishAgentRuntimeEnd({
|
||||
finalState: { error: result.error },
|
||||
operationId,
|
||||
reason: 'error',
|
||||
reasonDetail: result.error,
|
||||
stepIndex: 0,
|
||||
})
|
||||
.catch(() => {});
|
||||
await this.messageModel.update(assistantMsg.id, {
|
||||
content: '',
|
||||
|
||||
@@ -2,6 +2,7 @@ import { type LobeChatDatabase } from '@lobechat/database';
|
||||
import { CompressionRepository } from '@lobechat/database';
|
||||
import {
|
||||
type CreateMessageParams,
|
||||
type QueryMessageParams,
|
||||
type UIChatMessage,
|
||||
type UpdateMessageParams,
|
||||
} from '@lobechat/types';
|
||||
@@ -111,6 +112,18 @@ export class MessageService {
|
||||
return { messages, success: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the canonical message list for an agent / topic scope, using the
|
||||
* standard UIChatMessage shape (file URLs resolved through FileService).
|
||||
*
|
||||
* Mirrors the read path exposed by the `message.getMessages` trpc lambda
|
||||
* so server-internal callers (e.g. agent runtime stream events) can push
|
||||
* the same payload the client would otherwise fetch.
|
||||
*/
|
||||
async queryMessages(params: QueryMessageParams): Promise<UIChatMessage[]> {
|
||||
return this.messageModel.query(params, this.getQueryOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new message and return the complete message list
|
||||
* Pattern: create + query
|
||||
|
||||
@@ -360,6 +360,50 @@ describe('agentSelectors', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('canCurrentAgentPublishToCommunity', () => {
|
||||
it('should allow publishing normal agents', () => {
|
||||
const state = createState({
|
||||
activeAgentId: 'agent-1',
|
||||
agentMap: { 'agent-1': { id: 'agent-1' } },
|
||||
});
|
||||
|
||||
expect(agentSelectors.canCurrentAgentPublishToCommunity(state)).toBe(true);
|
||||
});
|
||||
|
||||
it('should prevent publishing local heterogeneous agents', () => {
|
||||
const state = createState({
|
||||
activeAgentId: 'agent-1',
|
||||
agentMap: {
|
||||
'agent-1': {
|
||||
agencyConfig: {
|
||||
heterogeneousProvider: { command: 'codex', type: 'codex' },
|
||||
},
|
||||
id: 'agent-1',
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(agentSelectors.canCurrentAgentPublishToCommunity(state)).toBe(false);
|
||||
});
|
||||
|
||||
it('should prevent publishing platform agents', () => {
|
||||
const state = createState({
|
||||
activeAgentId: 'agent-1',
|
||||
agentMap: {
|
||||
'agent-1': {
|
||||
agencyConfig: {
|
||||
boundDeviceId: 'device-1',
|
||||
heterogeneousProvider: { type: 'openclaw' },
|
||||
},
|
||||
id: 'agent-1',
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(agentSelectors.canCurrentAgentPublishToCommunity(state)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('currentKnowledgeIds', () => {
|
||||
it('should return enabled file and knowledge base IDs', () => {
|
||||
const state = createState({
|
||||
|
||||
@@ -286,6 +286,9 @@ const isCurrentAgentExternal = (s: AgentStoreState): boolean => !currentAgentDat
|
||||
const isCurrentAgentHeterogeneous = (s: AgentStoreState): boolean =>
|
||||
!!currentAgentConfig(s)?.agencyConfig?.heterogeneousProvider;
|
||||
|
||||
const canCurrentAgentPublishToCommunity = (s: AgentStoreState): boolean =>
|
||||
!!currentAgentData(s) && !isCurrentAgentHeterogeneous(s);
|
||||
|
||||
const currentAgentHeterogeneousProviderType = (s: AgentStoreState) =>
|
||||
currentAgentConfig(s)?.agencyConfig?.heterogeneousProvider?.type;
|
||||
|
||||
@@ -293,6 +296,7 @@ const getAgentDocumentsById = (agentId: string) => (s: AgentStoreState) =>
|
||||
s.agentDocumentsMap[agentId];
|
||||
|
||||
export const agentSelectors = {
|
||||
canCurrentAgentPublishToCommunity,
|
||||
currentAgentHeterogeneousProviderType,
|
||||
currentAgentAvatar,
|
||||
currentAgentBackgroundColor,
|
||||
|
||||
@@ -378,6 +378,139 @@ describe('runAgent actions', () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('replaces messages from server-pushed uiMessages snapshot (SoT)', async () => {
|
||||
const replaceMessages = vi.fn();
|
||||
act(() => {
|
||||
useChatStore.setState({
|
||||
replaceMessages,
|
||||
operations: {
|
||||
[TEST_IDS.OPERATION_ID]: {
|
||||
abortController: new AbortController(),
|
||||
context: { agentId: 'agent-1', topicId: 'topic-1' },
|
||||
id: TEST_IDS.OPERATION_ID,
|
||||
metadata: { lastEventId: '0', startTime: Date.now(), stepCount: 0 },
|
||||
status: 'running',
|
||||
type: 'groupAgentGenerate',
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
const context = createStreamingContext({ assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID });
|
||||
const uiMessages = [{ id: 'msg_a', role: 'user' }] as any;
|
||||
|
||||
const event: StreamEvent = {
|
||||
type: 'step_start',
|
||||
timestamp: Date.now(),
|
||||
operationId: TEST_IDS.OPERATION_ID,
|
||||
data: { phase: 'tool_execution', uiMessages },
|
||||
};
|
||||
|
||||
await act(async () => {
|
||||
await result.current.internal_handleAgentStreamEvent(
|
||||
TEST_IDS.OPERATION_ID,
|
||||
event,
|
||||
context,
|
||||
);
|
||||
});
|
||||
|
||||
expect(replaceMessages).toHaveBeenCalledWith(uiMessages, {
|
||||
context: { agentId: 'agent-1', topicId: 'topic-1' },
|
||||
});
|
||||
});
|
||||
|
||||
it('does not call replaceMessages when uiMessages absent on step_start', async () => {
|
||||
const replaceMessages = vi.fn();
|
||||
act(() => {
|
||||
useChatStore.setState({ replaceMessages });
|
||||
});
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
const context = createStreamingContext({ assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID });
|
||||
const event: StreamEvent = {
|
||||
type: 'step_start',
|
||||
timestamp: Date.now(),
|
||||
operationId: TEST_IDS.OPERATION_ID,
|
||||
data: { phase: 'tool_execution' }, // no uiMessages
|
||||
};
|
||||
|
||||
await act(async () => {
|
||||
await result.current.internal_handleAgentStreamEvent(
|
||||
TEST_IDS.OPERATION_ID,
|
||||
event,
|
||||
context,
|
||||
);
|
||||
});
|
||||
|
||||
expect(replaceMessages).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('agent_runtime_end event', () => {
|
||||
it('replaces messages from terminal uiMessages snapshot (final-step SoT)', async () => {
|
||||
const replaceMessages = vi.fn();
|
||||
act(() => {
|
||||
useChatStore.setState({
|
||||
replaceMessages,
|
||||
operations: {
|
||||
[TEST_IDS.OPERATION_ID]: {
|
||||
abortController: new AbortController(),
|
||||
context: { agentId: 'agent-1', topicId: 'topic-1' },
|
||||
id: TEST_IDS.OPERATION_ID,
|
||||
metadata: { lastEventId: '0', startTime: Date.now(), stepCount: 0 },
|
||||
status: 'running',
|
||||
type: 'groupAgentGenerate',
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
const uiMessages = [{ id: 'msg_final', role: 'assistantGroup' }] as any;
|
||||
|
||||
const event: StreamEvent = {
|
||||
type: 'agent_runtime_end',
|
||||
timestamp: Date.now(),
|
||||
operationId: TEST_IDS.OPERATION_ID,
|
||||
data: { finalState: { status: 'done' }, reason: 'done', uiMessages },
|
||||
};
|
||||
|
||||
await act(async () => {
|
||||
await result.current.internal_handleAgentStreamEvent(
|
||||
TEST_IDS.OPERATION_ID,
|
||||
event,
|
||||
createStreamingContext({ assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID }),
|
||||
);
|
||||
});
|
||||
|
||||
expect(replaceMessages).toHaveBeenCalledWith(uiMessages, {
|
||||
context: { agentId: 'agent-1', topicId: 'topic-1' },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('step_complete event', () => {
|
||||
// The previous DB-refetch on tool_execution was the source of the
|
||||
// assistantGroup-clobber regression (LOBE-9501) — tool results are
|
||||
// now reconciled via the next step_start's uiMessages snapshot.
|
||||
it('does NOT refreshMessages on tool_execution phase', async () => {
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
const event: StreamEvent = {
|
||||
type: 'step_complete',
|
||||
timestamp: Date.now(),
|
||||
operationId: TEST_IDS.OPERATION_ID,
|
||||
data: { executionTime: 10, phase: 'tool_execution', result: { ok: true } },
|
||||
};
|
||||
|
||||
await act(async () => {
|
||||
await result.current.internal_handleAgentStreamEvent(
|
||||
TEST_IDS.OPERATION_ID,
|
||||
event,
|
||||
createStreamingContext({ assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID }),
|
||||
);
|
||||
});
|
||||
|
||||
expect(result.current.refreshMessages).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -118,9 +118,19 @@ export class AgentActionImpl {
|
||||
|
||||
case 'agent_runtime_end': {
|
||||
// Agent runtime finished - this is the definitive signal that generation is complete
|
||||
const { reason, reasonDetail, finalState } = event.data || {};
|
||||
const { reason, reasonDetail, finalState, uiMessages } = event.data || {};
|
||||
log(`Agent runtime ended for ${assistantId}: reason=${reason}, detail=${reasonDetail}`);
|
||||
|
||||
// Server pushes the canonical UIChatMessage[] snapshot for the
|
||||
// topic as the Source of Truth on terminal-state. The last step
|
||||
// has no later step_start to carry a fresh snapshot, so without
|
||||
// this branch the streamed assistantGroup would only be reconciled
|
||||
// with DB once a refetch fires — losing the SoT guarantee.
|
||||
if (Array.isArray(uiMessages)) {
|
||||
log(`Replacing messages from agent_runtime_end uiMessages (${uiMessages.length} msgs)`);
|
||||
this.#get().replaceMessages(uiMessages, { context: operation.context });
|
||||
}
|
||||
|
||||
// Update operation metadata with final state
|
||||
if (finalState) {
|
||||
this.#get().updateOperationMetadata(operationId, {
|
||||
@@ -276,7 +286,19 @@ export class AgentActionImpl {
|
||||
}
|
||||
|
||||
case 'step_start': {
|
||||
const { phase, toolCall, pendingToolsCalling, requiresApproval } = event.data || {};
|
||||
const { phase, toolCall, pendingToolsCalling, requiresApproval, uiMessages } =
|
||||
event.data || {};
|
||||
|
||||
// Server attaches the canonical UIChatMessage[] snapshot to
|
||||
// step_start so the client uses the pushed payload as Source of
|
||||
// Truth instead of refetching from DB (the DB fan-out from the
|
||||
// previous step's stream chunks is async — a refetch here would
|
||||
// return a stale assistant placeholder that clobbers the
|
||||
// streamed assistantGroup).
|
||||
if (Array.isArray(uiMessages)) {
|
||||
log(`Replacing messages from step_start uiMessages (${uiMessages.length} msgs)`);
|
||||
this.#get().replaceMessages(uiMessages, { context: operation.context });
|
||||
}
|
||||
|
||||
if (phase === 'human_approval' && requiresApproval) {
|
||||
// Requires human approval
|
||||
@@ -301,8 +323,10 @@ export class AgentActionImpl {
|
||||
|
||||
if (phase === 'tool_execution' && result) {
|
||||
log(`Tool execution completed for ${assistantId} in ${executionTime}ms:`, result);
|
||||
// Refresh messages to display tool results
|
||||
await this.#get().refreshMessages();
|
||||
// Tool results are reconciled via the canonical uiMessages
|
||||
// snapshot the server pushes on the next step_start; no need
|
||||
// to refetch from DB here (the refetch was the source of the
|
||||
// assistantGroup-clobber regression that LOBE-9501 fixes).
|
||||
} else if (phase === 'execution_complete' && finalState) {
|
||||
// Agent execution complete
|
||||
log(`Agent execution completed for ${assistantId}:`, finalState);
|
||||
|
||||
@@ -82,7 +82,7 @@ describe('createGatewayEventHandler', () => {
|
||||
});
|
||||
|
||||
describe('stream_start', () => {
|
||||
it('should associate new message with operation', async () => {
|
||||
it('should associate new message with operation and skip the DB refetch (LOBE-9501)', async () => {
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
@@ -90,7 +90,12 @@ describe('createGatewayEventHandler', () => {
|
||||
await flush();
|
||||
|
||||
expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-step2', 'op-1');
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
// Native gateway streams carry the new assistant id directly + a SoT
|
||||
// uiMessages snapshot on the preceding step_start, so stream_start must
|
||||
// NOT trigger a DB refetch (the refetch is what clobbered the streamed
|
||||
// assistantGroup with a stale placeholder).
|
||||
expect(messageService.getMessages).not.toHaveBeenCalled();
|
||||
expect(store.replaceMessages).not.toHaveBeenCalled();
|
||||
expect(emitClientAgentSignalSourceEvent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
payload: expect.objectContaining({
|
||||
@@ -843,17 +848,14 @@ describe('createGatewayEventHandler', () => {
|
||||
});
|
||||
|
||||
describe('sequential processing', () => {
|
||||
it('should process stream_chunk only after stream_start refresh completes', async () => {
|
||||
it('should dispatch stream_chunk to the new assistant id after stream_start switches it', async () => {
|
||||
// Native gateway streams no longer await a DB fetch on stream_start
|
||||
// (LOBE-9501) — but stream_chunk must still queue behind stream_start
|
||||
// so the chunk targets the NEW assistant id (from stream_start.data),
|
||||
// not the previous one.
|
||||
const store = createMockStore();
|
||||
const callOrder: string[] = [];
|
||||
|
||||
const { messageService } = await import('@/services/message');
|
||||
(messageService.getMessages as any).mockImplementation(async () => {
|
||||
callOrder.push('refresh_start');
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
callOrder.push('refresh_end');
|
||||
return [];
|
||||
});
|
||||
store.internal_dispatchMessage.mockImplementation(() => {
|
||||
callOrder.push('dispatch');
|
||||
});
|
||||
@@ -867,10 +869,34 @@ describe('createGatewayEventHandler', () => {
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Hello' }));
|
||||
await flush();
|
||||
|
||||
const refreshEndIdx = callOrder.indexOf('refresh_end');
|
||||
// associate (from stream_start) precedes dispatch (from stream_chunk)
|
||||
const associateIdx = callOrder.indexOf('associate');
|
||||
const dispatchIdx = callOrder.indexOf('dispatch');
|
||||
expect(refreshEndIdx).toBeGreaterThan(-1);
|
||||
expect(dispatchIdx).toBeGreaterThan(refreshEndIdx);
|
||||
expect(associateIdx).toBeGreaterThan(-1);
|
||||
expect(dispatchIdx).toBeGreaterThan(associateIdx);
|
||||
|
||||
// Chunk targets the new id, proving the queue ordering held
|
||||
expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({ id: 'msg-new', value: { content: 'Hello' } }),
|
||||
{ operationId: 'op-1' },
|
||||
);
|
||||
// And no DB refetch was issued for the native stream
|
||||
expect(messageService.getMessages).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should still fetch from DB on stream_start when assistantMessage id is absent (hetero CLI)', async () => {
|
||||
// Hetero CLI adapters (Claude Code / Codex) never set
|
||||
// `assistantMessage.id` on stream_start, so the DB read is still
|
||||
// mandatory — it pulls the executor-created placeholder into
|
||||
// `dbMessagesMap` so subsequent chunks have a target.
|
||||
const store = createMockStore();
|
||||
const handler = createHandler(store);
|
||||
|
||||
handler(makeEvent('stream_start', {}));
|
||||
await flush();
|
||||
|
||||
expect(messageService.getMessages).toHaveBeenCalled();
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -901,18 +927,22 @@ describe('createGatewayEventHandler', () => {
|
||||
// Loading stays active between steps — only tool streaming is cleared
|
||||
expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith('msg-1', undefined);
|
||||
|
||||
// Tool execution
|
||||
// Tool execution — tool_end still refreshes from DB to pick up the
|
||||
// server-created tool message row.
|
||||
handler(makeEvent('tool_start', { parentMessageId: 'msg-1', toolCalling: tools[0] }));
|
||||
handler(makeEvent('tool_end', { isSuccess: true }));
|
||||
await flush();
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
|
||||
// Step 2: Next LLM call with new assistant message
|
||||
// Step 2: Next LLM call with new assistant message — native stream_start
|
||||
// carries the id directly, so it must NOT trigger a DB refetch
|
||||
// (LOBE-9501). Only the association switch happens.
|
||||
vi.clearAllMocks();
|
||||
handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-2' } }));
|
||||
await flush();
|
||||
expect(store.replaceMessages).toHaveBeenCalled();
|
||||
expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-2', 'op-1');
|
||||
expect(messageService.getMessages).not.toHaveBeenCalled();
|
||||
expect(store.replaceMessages).not.toHaveBeenCalled();
|
||||
|
||||
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Here are the results.' }));
|
||||
await flush();
|
||||
|
||||
@@ -7,7 +7,12 @@ import type {
|
||||
ToolExecuteData,
|
||||
ToolStartData,
|
||||
} from '@lobechat/agent-gateway-client';
|
||||
import type { BuiltinToolResult, ChatMessageError, ConversationContext } from '@lobechat/types';
|
||||
import type {
|
||||
BuiltinToolResult,
|
||||
ChatMessageError,
|
||||
ConversationContext,
|
||||
UIChatMessage,
|
||||
} from '@lobechat/types';
|
||||
import { AgentRuntimeErrorType } from '@lobechat/types';
|
||||
|
||||
import { messageService } from '@/services/message';
|
||||
@@ -276,24 +281,35 @@ export const createGatewayEventHandler = (
|
||||
accumulatedContent = '';
|
||||
accumulatedReasoning = '';
|
||||
|
||||
// Heterogeneous CLI adapters emit `stream_start { newStep: true }`
|
||||
// without a server-side assistant id. Pull the freshly created step
|
||||
// assistant from DB so subsequent live chunks update the RIGHT row
|
||||
// instead of appending onto the previous step's assistant.
|
||||
const messages = await fetchAndReplaceMessages(get, context).catch((error) => {
|
||||
console.error(error);
|
||||
return undefined;
|
||||
});
|
||||
// Skip the DB read ONLY for native gateway streams — those carry
|
||||
// `assistantMessage.id` directly on stream_start AND the preceding
|
||||
// `step_start` already carried the SoT uiMessages snapshot, so
|
||||
// chunks have a valid target in `dbMessagesMap` already. Removing
|
||||
// the await here is what un-blocks the enqueue chain so live
|
||||
// chunks can land mid-stream (LOBE-9501).
|
||||
//
|
||||
// Hetero CLI adapters (Claude Code / Codex) never set
|
||||
// `assistantMessage.id` on stream_start, so the DB read stays
|
||||
// mandatory for them — it (a) pulls the executor-created
|
||||
// placeholder into `dbMessagesMap` so subsequent chunks can
|
||||
// dispatch to it, and (b) resolves the next-step assistant id for
|
||||
// the `newStep` fallback.
|
||||
if (!newAssistantMessageId) {
|
||||
const messages = await fetchAndReplaceMessages(get, context).catch((error) => {
|
||||
console.error(error);
|
||||
return undefined;
|
||||
});
|
||||
|
||||
if (!newAssistantMessageId && data?.newStep) {
|
||||
const resolvedAssistantMessageId = findNextAssistantMessageId(
|
||||
messages as GatewayMessageLike[] | undefined,
|
||||
currentAssistantMessageId,
|
||||
);
|
||||
if (data?.newStep) {
|
||||
const resolvedAssistantMessageId = findNextAssistantMessageId(
|
||||
messages as GatewayMessageLike[] | undefined,
|
||||
currentAssistantMessageId,
|
||||
);
|
||||
|
||||
if (resolvedAssistantMessageId) {
|
||||
currentAssistantMessageId = resolvedAssistantMessageId;
|
||||
get().associateMessageWithOperation(currentAssistantMessageId, operationId);
|
||||
if (resolvedAssistantMessageId) {
|
||||
currentAssistantMessageId = resolvedAssistantMessageId;
|
||||
get().associateMessageWithOperation(currentAssistantMessageId, operationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -406,8 +422,18 @@ export const createGatewayEventHandler = (
|
||||
pendingToolsCalling?: unknown[];
|
||||
phase?: string;
|
||||
requiresApproval?: boolean;
|
||||
uiMessages?: UIChatMessage[];
|
||||
};
|
||||
|
||||
// Server attaches the canonical UIChatMessage[] snapshot at every
|
||||
// step boundary (agent-runtime #15152). Use it as Source of Truth
|
||||
// instead of issuing a DB refetch — the refetch returns a stale
|
||||
// assistant placeholder while DB fan-out is still in flight, which
|
||||
// clobbers the in-memory streamed assistantGroup (LOBE-9501).
|
||||
if (Array.isArray(data?.uiMessages)) {
|
||||
get().replaceMessages(data.uiMessages, { action: 'gateway/step_start', context });
|
||||
}
|
||||
|
||||
if (data?.phase === 'human_approval' && data.requiresApproval && data.pendingToolsCalling) {
|
||||
void notifyDesktopHumanApprovalRequired(get, context);
|
||||
// Persist a paused marker so the sidebar reflects "waiting on user" across reload.
|
||||
@@ -475,6 +501,8 @@ export const createGatewayEventHandler = (
|
||||
|
||||
case 'agent_runtime_end': {
|
||||
enqueue(async () => {
|
||||
const data = event.data as { uiMessages?: UIChatMessage[] } | undefined;
|
||||
|
||||
void emitClientAgentSignalSourceEvent({
|
||||
payload: {
|
||||
agentId: context.agentId,
|
||||
@@ -499,7 +527,18 @@ export const createGatewayEventHandler = (
|
||||
get().markUnreadCompleted(completedOp.context.agentId, completedOp.context.topicId);
|
||||
}
|
||||
|
||||
await fetchAndReplaceMessages(get, context).catch(console.error);
|
||||
// Terminal step has no later step_start to carry SoT — server
|
||||
// pushes the canonical snapshot directly on this event. Fall back
|
||||
// to a DB refetch only if the snapshot is absent (older server
|
||||
// builds, or push-event delivery edge cases).
|
||||
if (Array.isArray(data?.uiMessages)) {
|
||||
get().replaceMessages(data.uiMessages, {
|
||||
action: 'gateway/agent_runtime_end',
|
||||
context,
|
||||
});
|
||||
} else {
|
||||
await fetchAndReplaceMessages(get, context).catch(console.error);
|
||||
}
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -106,8 +106,24 @@ export class MessageQueryActionImpl {
|
||||
|
||||
useFetchMessages = (
|
||||
context: ConversationContext,
|
||||
skipFetch?: boolean,
|
||||
options?: {
|
||||
/**
|
||||
* Skip the fetch entirely (e.g. while another flow owns the data).
|
||||
* Equivalent to passing a null SWR key.
|
||||
*/
|
||||
skipFetch?: boolean;
|
||||
/**
|
||||
* Revalidate when the window regains focus. Defaults to SWR's
|
||||
* client-data default (true). Pass `false` to suppress the focus
|
||||
* refetch — used during streaming so the in-memory stream payload
|
||||
* (Source of Truth) isn't clobbered by a stale DB read while DB
|
||||
* fan-out writes are still in flight.
|
||||
*/
|
||||
revalidateOnFocus?: boolean;
|
||||
},
|
||||
): SWRResponse<UIChatMessage[]> => {
|
||||
const { skipFetch, revalidateOnFocus } = options ?? {};
|
||||
|
||||
// Skip fetch when skipFetch is true or required fields are missing
|
||||
const shouldFetch = !skipFetch && !!context.agentId && !!context.topicId;
|
||||
|
||||
@@ -121,6 +137,7 @@ export class MessageQueryActionImpl {
|
||||
// Use replaceMessages to store the fetched messages
|
||||
this.#get().replaceMessages(data, { action: 'useFetchMessages', context });
|
||||
},
|
||||
...(revalidateOnFocus !== undefined && { revalidateOnFocus }),
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
@@ -122,6 +122,9 @@ exports[`settingsSelectors > defaultAgent > should merge DEFAULT_AGENT and s.set
|
||||
"provider": "deepseek",
|
||||
},
|
||||
"searchMode": "auto",
|
||||
"selfIteration": {
|
||||
"enabled": false,
|
||||
},
|
||||
},
|
||||
"model": "gpt-3.5-turbo",
|
||||
"openingQuestions": [],
|
||||
@@ -166,6 +169,9 @@ exports[`settingsSelectors > defaultAgentConfig > should merge DEFAULT_AGENT_CON
|
||||
"provider": "deepseek",
|
||||
},
|
||||
"searchMode": "auto",
|
||||
"selfIteration": {
|
||||
"enabled": false,
|
||||
},
|
||||
},
|
||||
"model": "gpt-4",
|
||||
"openingQuestions": [],
|
||||
|
||||
Reference in New Issue
Block a user