feat(chat): consume gateway uiMessages snapshot as SoT at step boundaries (#15153)

* ♻️ refactor(chat-store): useFetchMessages accepts options object

LOBE-9501

Replace the positional `skipFetch?: boolean` second argument with an
`options?: { skipFetch?, revalidateOnFocus? }` object on both
`useChatStore.useFetchMessages` and `useConversationStore.useFetchMessages`.
Plumb `revalidateOnFocus` through to the underlying SWR config so callers
can suppress focus revalidate per-call (default behaviour unchanged).

Mechanically migrate all 7 call sites to the new shape. No behaviour
change in this commit — the streaming-aware `revalidateOnFocus: false`
follow-up lives in the next commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

*  feat(chat): consume gateway uiMessages snapshot as SoT at step boundaries

LOBE-9501

Server attaches the canonical UIChatMessage[] snapshot to step_start and
agent_runtime_end events (#15152). The client now uses that pushed payload
as the source of truth instead of refetching from DB:

- step_start handler calls replaceMessages(uiMessages, { context }) when
  the snapshot is present, so the assistant tab-switch / next-step path
  no longer issues a refetch that returns a stale assistant placeholder.
- agent_runtime_end handler does the same for the terminal step — the
  last step has no later step_start to carry a fresh snapshot, so this
  branch is the only one that reconciles the final commit.
- step_complete on phase=tool_execution stops calling refreshMessages.
  That refetch was the direct cause of the assistantGroup→assistant
  clobber regression captured by the agent-gateway probe scripts.
- ChatList disables SWR revalidateOnFocus while the current topic is
  streaming (via operationSelectors.isAgentRuntimeRunningByContext) and
  automatically restores it after the run ends. Tab-focus during a run
  no longer triggers the stale DB read.

Doesn't touch streamingExecutor.ts (homogeneous runtime — parallel path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(chat-store): wire gateway handler to consume server-pushed uiMessages SoT

LOBE-9501

#15152 (server) attaches the canonical UIChatMessage[] snapshot to both
the Redis SSE channel and the gateway /push-event channel. The earlier
client patch wired the consumer into `runAgent.ts`, but that file only
runs on the Group Chat SSE path. The actual gateway entry point
(`createGatewayEventHandler` in `gatewayEventHandler.ts`, used by single
agent, sub-agent, and hetero-CLI flows) ignored the field entirely and
kept refetching from DB.

Fix the gateway handler:

- step_start: consume `event.data.uiMessages` and replaceMessages with
  the pushed SoT. Skipped when absent — hetero adapters don't emit
  step_start at all (HeterogeneousEventType excludes it), so the new
  branch is invisible to hetero.

- agent_runtime_end: same SoT consumption; the existing
  `fetchAndReplaceMessages` becomes the fallback for events without the
  field. Claude Code adapter emits agent_runtime_end with empty data,
  so hetero terminal behavior is preserved by the fallback.

- stream_start: gate the DB fetch on `!newAssistantMessageId`. Native
  gateway streams carry `assistantMessage.id` (the preceding step_start
  also delivered the SoT), so the await is unnecessary — AND it was
  blocking the enqueue chain. Live chunks queued behind that await
  could not dispatch, which manifested as "streaming content never
  lands in messagesMap" during tab-switch and slow-network repros.
  Hetero CLI streams never set `assistantMessage.id`, so the fetch
  still runs for them on every stream_start.

Verified with the agent-gateway probe (separate commit): chunks now
land in real time (cLen grows 3 → 529 monotonically), and tab-switch
mid-stream no longer rolls the streamed assistantGroup back to the
LOADING placeholder (ROLLBACKS=none in the analyzer output).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🧪 chore(local-testing): rewrite agent-gateway probes in TS + add CLI

LOBE-9501

Convert the local-testing agent-gateway probes from .js/.mjs to TypeScript
and add a unified `run.ts` CLI that bundles via Bun.build (no extra
deps) and persists dumps to a gitignored `.agent-gateway/` directory for
use as streaming-replay test fixtures.

- types.ts: shared dump shape (ProbeStreamEvent / ProbeTimelineSample /
  ProbeDump) and `declare global` for the `window.__PROBE_*` surface
- probe-events.ts: WebSocket + fetch interception (gateway WS captures
  any socket with `operationId=`; fetch captures `/api/agent/stream` for
  direct SSE). Per-key timeline samples every 200ms so we can see
  which messagesMap key streaming chunks actually land in
- probe-dump.ts: stops the timeline timer and stashes JSON dump on
  `window.__PROBE_LAST_DUMP_JSON` (runner returns that global)
- analyze-events.ts: stream events (non-chunk) + chunks summary +
  action-call stacks + correlation + per-key assistant growth +
  rollback detection. Per-key growth was added specifically to
  diagnose "chunks arrive but assistant cLen never moves"
- run.ts: `install` | `dump [name]` | `analyze [path]` CLI. Bundles via
  Bun.build, wraps as IIFE with explicit return, pipes to
  `agent-browser eval --stdin`. Dumps land at
  `.agent-gateway/<name>-<YYYYMMDD-HHmmss>.json`

`.agent-gateway/` is gitignored so dumps accumulate across debugging
sessions without polluting git.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🐛 fix(local-testing): repair run.ts after autofix mangled path imports

LOBE-9501

The eslint --fix run during the previous commit applied the unicorn
`import-style` rule and renamed every `join(` / `dirname(` / `resolve(`
to `path.join(` / `path.dirname(` / `path.resolve(`, but the replacement
was a naive text substitution that:

1. rewrote `array.join('\n')` to `array.path.join('\n')` — broke bundle
   error reporting (would TypeError on the build-failure path)
2. produced `const path = path.join(DUMP_DIR, filename)` inside cmdDump
   — shadowed the `path` module with itself, ReferenceError on every
   dump invocation

Rename the local `path` to `dumpPath` and drop the spurious `.path`
prefix on the array `.join`. Verified round-trip: install + dump now
write a valid capture to `.agent-gateway/`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🧪 chore(local-testing): capture per-call message snapshot in probe

LOBE-9501

The probe's `replaceMessages` wrapper used to record only `count` and
`params` — enough to see "two messages were written" but not WHICH two.
For post-stream collapse debugging we need to see whether each call
restored streamed content (cLen=N) or wiped to LOADING_FLAT (cLen=3).

Two changes:

- Capture `snapshot` field on every replaceMessages call: last 2
  messages' id / role / cLen / rLen / updatedAt. The analyzer prints
  this inline next to each call so reviewers can see content drift /
  collapse without re-reading the dump.

- Make wrapping idempotent across re-installs. The old guard
  `chat.__probeWrapped = true` froze the first-installed wrapper across
  re-installs, so updates to the probe body had no effect without a
  page reload. Stash the originals on
  `window.__PROBE_ORIG_REFRESH_MESSAGES` /
  `window.__PROBE_ORIG_REPLACE_MESSAGES` and re-wrap from those on
  every install.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🧪 chore(local-testing): add mutation log + dispatchMessage wrap to probe

LOBE-9501

The replaceMessages-only wrap couldn't catch chunk-level writes (those go
through internal_dispatchMessage) or attribute post-stream collapses to a
specific writer. Add:

- `__PROBE_MUTATIONS` — unified ordered log of every dbMessagesMap[key]
  reference change, with `last`/`prevLast` summaries and a `delta` field
  that tags interesting transitions (`cLen↓N→M`, `rLen↓`, `id:A→B`,
  `n↓prev→cur`). Both writers — replaceMessages AND internal_dispatchMessage
  — push to the same buffer so a single timeline shows all stores writes.

- Idempotent action wrapping. Originals are stashed on
  `window.__PROBE_ORIG_*` and re-wrapped from there on every install, so
  probe edits take effect without a page reload (previous
  `chat.__probeWrapped` flag froze the first wrapper).

- Snapshot field on replaceMessages — last 2 messages'
  id/role/cLen/rLen/updatedAt — so reviewers can see WHICH content each
  call is writing instead of just the count.

- Dump file now carries the `mutations` array alongside streamEvents,
  actionCalls, timeline.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🐛 fix(chat-store): gate SWR onData by isStreaming for streaming topic

LOBE-9501

Backstop for the post-stream cLen collapse that survives even with the
gateway SoT consume in place. Reproduction (confirmed):

1. Send a stream that lands lots of WS chunks into ChatStore
2. Immediately reload the page

If the page reload races against server-side chunk fan-out into Postgres,
SWR's fresh fetch returns the assistant row in its LOADING_FLAT placeholder
state (cLen=3) and writes that to ChatStore via the conversation-store
mirror — even though the WS push at agent_runtime_end carried the
correct full content moments earlier.

`mergeFetchedMessagesWithLocalState`'s updatedAt tie-breaker handles
this for in-session repros (local message wins when its updatedAt is
newer), but it degenerates when:

- The SoT consume just wrote server's snapshot updatedAt onto the local
  message, equalising the timestamps so the next stale DB fetch wins
- The user reloads (no local state to merge against — fresh fetch wins
  outright)

Add a gate at the bottom of `ConversationStore.useFetchMessages.onData`:
while `isAgentRuntimeRunningByContext(context)` is true, drop the SWR
write entirely. SWR's own cache still updates, so once streaming ends a
normal revalidate writes through correctly.

This is layered defense — it does NOT fix the underlying server-side
fan-out lag (filed as separate Linear issue). It does prevent the
client-side flash users currently see during the lag window.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🧪 test(chat-store): align gateway handler tests with SoT contract

The previous assertions still expected `stream_start` to issue a DB refetch
on every native gateway stream — the very behaviour LOBE-9501 removes
(`acb9523a04`). Update the three failing cases to the new contract:

- `stream_start > should associate new message with operation`:
  assert `messageService.getMessages` is NOT called when
  `assistantMessage.id` is present (the SoT snapshot from the preceding
  `step_start` already pre-populated `dbMessagesMap`).
- `sequential processing`: rewrite around the surviving ordering guarantee
  — `associate` (stream_start) must precede `dispatch` (stream_chunk) so
  the chunk targets the new id. Add a sibling case for hetero CLI streams
  (no `assistantMessage.id` → DB fetch is still mandatory).
- `multi-step integration > full LLM → tools → LLM cycle`: keep the
  post-`tool_end` `replaceMessages` assertion (tool_end still refreshes
  from DB), invert the post-`stream_start` assertion for step 2.

42 tests passing (was 41 + 1 new hetero fallback test).

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-05-24 20:05:58 +08:00
committed by GitHub
parent 5f6f053039
commit 837a3daa58
19 changed files with 1567 additions and 50 deletions
@@ -0,0 +1,243 @@
// Analyzer for probe-events dumps. Reads a JSON file produced by `run.ts dump`
// and prints a layered breakdown:
//
// 1. STREAM EVENTS — every non-chunk WS/SSE event in receipt order
// 2. CHUNKS SUMMARY — collapsed per-step chunk counts (otherwise floods)
// 3. ACTION CALLS — replaceMessages / refreshMessages / MARK:* with stack
// 4. CORRELATION — calls ↔ nearest stream event within ±300ms
// 5. PER-KEY ASSISTANT GROWTH — for each messagesMap key, when the leading
// assistant message's cLen / rLen actually moves (this is what reveals
// "chunks arrived but the message never grew" regressions)
// 6. ROLLBACKS — msgN / childN / role drops in the active-topic timeline
//
// Usage:
// bun run .agents/skills/local-testing/scripts/agent-gateway/analyze-events.ts <dump.json>
import { readFileSync } from 'node:fs';
import type {
ProbeActionCall,
ProbeDump,
ProbeMessageSummary,
ProbeStreamEvent,
ProbeTimelineSample,
} from './types';
const file = process.argv[2];
if (!file) {
console.error('usage: bun run analyze-events.ts <dump.json>');
process.exit(1);
}
const raw = readFileSync(file, 'utf8');
// agent-browser eval --stdin wraps return values in quotes when the value is
// a string — so the JSON file may be double-encoded depending on how it was
// captured. Handle both.
const parsedOnce = JSON.parse(raw) as ProbeDump | string;
const dump: ProbeDump = typeof parsedOnce === 'string' ? JSON.parse(parsedOnce) : parsedOnce;
const { streamEvents = [], actionCalls = [], timeline = [] } = dump;
const pad = (v: unknown, n: number) => String(v).padStart(n);
// ── META ───────────────────────────────────────────────────────────
console.log('=== META ===');
console.log(` events: ${streamEvents.length}`);
console.log(` calls: ${actionCalls.length}`);
console.log(` timeline: ${timeline.length}`);
// ── 1. STREAM EVENTS (non-chunk) ───────────────────────────────────
const nonChunkEvents = streamEvents.filter((e) => e.type !== 'stream_chunk');
const chunkEvents = streamEvents.filter((e) => e.type === 'stream_chunk');
console.log(
`\n=== STREAM EVENTS (${nonChunkEvents.length} non-chunk + ${chunkEvents.length} chunks elided) ===`,
);
for (const e of nonChunkEvents) {
const dataStr = e.dataKeys?.length ? ` [${e.dataKeys.join(',')}]` : '';
const data = e.data as Record<string, unknown> | undefined;
const uiHint = data?.uiMessagesPreview
? ` uiPreview=${JSON.stringify(data.uiMessagesPreview)}`
: data?.uiMessagesTotal
? ` uiTotal=${data.uiMessagesTotal}`
: '';
const phaseHint = data?.phase ? ` phase=${data.phase}` : '';
const extra = e.serverType ? ` serverType=${e.serverType}` : '';
console.log(
` t=${pad(e.t, 7)} [${(e.transport ?? '?').padEnd(3)}] step=${pad(e.stepIndex ?? '-', 2)} ` +
`type=${(e.type ?? '').padEnd(22)} op=${e.opIdTail ?? '-'}${phaseHint}${uiHint}${extra}${dataStr}`,
);
}
// ── 2. CHUNK SUMMARY ───────────────────────────────────────────────
console.log('\n=== CHUNKS SUMMARY (per step / chunkType) ===');
const chunkBuckets = new Map<string, { count: number; firstT: number; lastT: number }>();
for (const c of chunkEvents) {
const data = c.data as Record<string, unknown> | undefined;
const ct = (data?.chunkType as string | undefined) ?? '?';
const key = `step=${c.stepIndex ?? '-'} chunkType=${ct.padEnd(8)} op=${c.opIdTail}`;
const slot = chunkBuckets.get(key);
if (slot) {
slot.count += 1;
slot.lastT = c.t;
} else {
chunkBuckets.set(key, { count: 1, firstT: c.t, lastT: c.t });
}
}
for (const [k, v] of chunkBuckets) {
console.log(` ${k} count=${pad(v.count, 4)} t=${pad(v.firstT, 7)}..${pad(v.lastT, 7)}`);
}
// ── 3. ACTION CALLS ───────────────────────────────────────────────
console.log('\n=== ACTION CALLS (replace/refresh/MARK) ===');
for (const c of actionCalls) {
if (c.name?.startsWith('MARK:')) {
console.log(` t=${pad(c.t, 7)} ${c.name}`);
continue;
}
const snapshot = (c.args as any)?.snapshot as
| Array<{ id: string; role: string; cLen: number; rLen: number }>
| undefined;
const snapStr = snapshot?.length
? ' snapshot=' + snapshot.map((m) => `${m.id}:${m.role}/c${m.cLen}/r${m.rLen}`).join(' | ')
: '';
const summary =
c.name === 'replaceMessages'
? `count=${c.args?.count} action=${(c.args?.params as any)?.action ?? '-'}${snapStr}`
: c.name === 'refreshMessages'
? `ctx=${JSON.stringify(c.args?.context)}`
: c.error
? `error=${c.error}`
: '';
console.log(` t=${pad(c.t, 7)} ${c.name.padEnd(20)} ${summary}`);
if (c.stack) {
const frames = c.stack
.split(' ← ')
.filter((f) => !!f && !f.includes('Object.<anonymous>'))
.slice(0, 3);
for (const f of frames) console.log(`${f}`);
}
}
// ── 4. CORRELATION ────────────────────────────────────────────────
function nearestEventForCall(
call: ProbeActionCall,
windowMs = 300,
): { event: ProbeStreamEvent; delta: number } | null {
let best: ProbeStreamEvent | null = null;
let bestDelta = Infinity;
for (const e of streamEvents) {
const d = Math.abs(e.t - call.t);
if (d < bestDelta && d <= windowMs) {
bestDelta = d;
best = e;
}
}
return best ? { event: best, delta: bestDelta } : null;
}
console.log('\n=== CORRELATION (replace/refresh ↔ nearest event within ±300ms) ===');
for (const c of actionCalls) {
if (c.name !== 'refreshMessages' && c.name !== 'replaceMessages') continue;
const hit = nearestEventForCall(c);
if (hit) {
const phase = (hit.event.data as Record<string, unknown> | undefined)?.phase;
console.log(
` t=${pad(c.t, 7)} ${c.name.padEnd(16)} ← Δ${pad(hit.delta, 4)}ms ${hit.event.type}` +
(phase ? ` phase=${phase}` : ''),
);
} else {
console.log(` t=${pad(c.t, 7)} ${c.name.padEnd(16)} ← (no event nearby — external trigger)`);
}
}
// ── 5. PER-KEY ASSISTANT GROWTH ───────────────────────────────────
// For each messagesMap key, find the trailing assistant message and report
// the points in time where its cLen / rLen actually changed. If the timeline
// shows chunks arriving but the assistant cLen never moves, that's the
// signature of "dispatch queue blocked / messageId mismatch".
console.log('\n=== PER-KEY ASSISTANT GROWTH ===');
const keysEverSeen = new Set<string>();
for (const s of timeline) for (const k of Object.keys(s.byKey ?? {})) keysEverSeen.add(k);
for (const key of keysEverSeen) {
console.log(`\n key=${key}`);
let lastSig: string | null = null;
for (const s of timeline) {
const slot = s.byKey?.[key];
if (!slot) continue;
const last = slot.msgs.at(-1) as ProbeMessageSummary | undefined;
if (!last) continue;
const sig = `${last.id}|c${last.cLen}|r${last.rLen}|n${slot.n}`;
if (sig === lastSig) continue;
lastSig = sig;
console.log(
` t=${pad(s.t, 7)} msgN=${pad(slot.n, 3)} ` +
`lastAssistant=${last.id} cLen=${pad(last.cLen, 5)} rLen=${pad(last.rLen, 5)}` +
` runOps=${s.runOps}`,
);
}
}
// ── 6. ROLLBACKS (active-topic msgN / childN / role drops) ─────────
console.log('\n=== ROLLBACKS (active-topic msgN / childN / role drops) ===');
let prev: ProbeTimelineSample | null = null;
const rollbacks: Array<{ t: number; topic: string | null; drops: string[] }> = [];
const flatten = (s: ProbeTimelineSample) => {
if (!s.activeTopic) return [];
return Object.entries(s.byKey ?? {})
.filter(([k]) => k.includes(s.activeTopic!))
.flatMap(([, v]) => v.msgs);
};
for (const s of timeline) {
if (s.err) {
prev = null;
continue;
}
if (!prev || prev.activeTopic !== s.activeTopic) {
prev = s;
continue;
}
const prevMsgs = flatten(prev);
const curMsgs = flatten(s);
const drops: string[] = [];
if (curMsgs.length < prevMsgs.length) drops.push(`msgN ${prevMsgs.length}${curMsgs.length}`);
let prevChild = 0;
let curChild = 0;
for (const m of prevMsgs) prevChild += m.chN ?? 0;
for (const m of curMsgs) curChild += m.chN ?? 0;
if (curChild < prevChild) drops.push(`childN ${prevChild}${curChild}`);
const prevById = new Map(prevMsgs.map((m) => [m.id, m]));
for (const m of curMsgs) {
const pr = prevById.get(m.id);
if (!pr) continue;
if (m.cLen < pr.cLen) drops.push(`cLen[${m.id}] ${pr.cLen}${m.cLen}`);
if (m.rLen < pr.rLen) drops.push(`rLen[${m.id}] ${pr.rLen}${m.rLen}`);
}
if (drops.length) rollbacks.push({ t: s.t, topic: s.activeTopic, drops });
prev = s;
}
if (rollbacks.length === 0) {
console.log(' (none)');
} else {
for (const r of rollbacks) {
const nearEvent = streamEvents
.filter((e) => Math.abs(e.t - r.t) <= 300)
.map((e) => `${e.type}${(e.data as any)?.phase ? ':' + (e.data as any).phase : ''}`);
const nearCall = actionCalls
.filter((c) => Math.abs(c.t - r.t) <= 300 && !c.name?.startsWith('MARK:'))
.map((c) => c.name);
console.log(
` t=${pad(r.t, 7)} topic=${r.topic} ${r.drops.join(' | ')}` +
(nearEvent.length ? ` near-event:[${nearEvent.join(',')}]` : '') +
(nearCall.length ? ` near-call:[${nearCall.join(',')}]` : ''),
);
}
}
@@ -0,0 +1,37 @@
// Stops the events-probe timeline timer and stashes the full capture as a
// JSON string on `window.__PROBE_LAST_DUMP_JSON`. `run.ts` wraps the bundle
// in an IIFE that returns that global, which `agent-browser eval` prints to
// stdout — the runner then persists it under `.agent-gateway/`.
import type { ProbeDump } from './types';
declare global {
interface Window {
__PROBE_LAST_DUMP_JSON?: string;
}
}
const w = window;
if (w.__PROBE_TIMELINE_TIMER) {
clearInterval(w.__PROBE_TIMELINE_TIMER);
w.__PROBE_TIMELINE_TIMER = null;
}
const mutations = w.__PROBE_MUTATIONS ?? [];
const dump: ProbeDump & { mutations: typeof mutations } = {
meta: {
t0: w.__PROBE_T0 ?? 0,
collectedAt: Date.now(),
sampleCount: (w.__PROBE_MSG_TIMELINE ?? []).length,
eventCount: (w.__PROBE_STREAM_EVENTS ?? []).length,
callCount: (w.__PROBE_ACTION_CALLS ?? []).length,
},
streamEvents: w.__PROBE_STREAM_EVENTS ?? [],
actionCalls: w.__PROBE_ACTION_CALLS ?? [],
timeline: w.__PROBE_MSG_TIMELINE ?? [],
mutations,
};
w.__PROBE_LAST_DUMP_JSON = JSON.stringify(dump);
@@ -0,0 +1,637 @@
// LobeHub gateway raw-event-stream probe.
//
// Gateway-mode chats subscribe via WebSocket — NOT via the `/api/agent/stream`
// SSE endpoint (that one belongs to the direct/client durable-agent runtime).
// `AgentStreamClient` (`packages/agent-gateway-client/src/client.ts`) opens
// `new WebSocket('wss://.../ws?operationId=...')`, then parses JSON frames in
// its `onmessage` handler and re-emits `agent_event.event` objects to the
// chat store.
//
// To capture the RAW gateway events before the store touches them, we wrap
// `window.WebSocket` so that for any socket whose URL contains `operationId=`
// we intercept the `onmessage` handler / `addEventListener('message')` and
// log every `agent_event` frame.
//
// We *also* keep the `window.fetch` hook for `/api/agent/stream` so this
// probe still works for direct-mode runs — but gateway-mode events come
// through the WebSocket path.
//
// Buffers (read via `dump`):
// __PROBE_STREAM_EVENTS — raw events parsed off the wire
// __PROBE_ACTION_CALLS — replaceMessages / refreshMessages calls (best-effort)
// __PROBE_MSG_TIMELINE — 200ms snapshots of every messagesMap key
import type {
ProbeActionCall,
ProbeMessageSummary,
ProbeStreamEvent,
ProbeTimelineSample,
} from './types';
// Bundled by esbuild as an IIFE. Top-level code runs once on injection.
const w = window;
// ── Buffers ─────────────────────────────────────────────────────────
declare global {
interface Window {
__PROBE_MUTATIONS?: Array<{
t: number;
key: string;
n: number;
last?: { id: string; role: string; cLen: number; rLen: number; updatedAt?: unknown };
prevLast?: { id: string; role: string; cLen: number; rLen: number };
delta?: string;
}>;
__PROBE_STORE_UNSUB?: () => void;
}
}
const events: ProbeStreamEvent[] = (w.__PROBE_STREAM_EVENTS ??= []);
const calls: ProbeActionCall[] = (w.__PROBE_ACTION_CALLS ??= []);
const timeline: ProbeTimelineSample[] = (w.__PROBE_MSG_TIMELINE ??= []);
const mutations = (w.__PROBE_MUTATIONS ??= []);
events.length = 0;
calls.length = 0;
timeline.length = 0;
mutations.length = 0;
const t0 = Date.now();
w.__PROBE_T0 = t0;
const now = (): number => Date.now() - t0;
// ── Helpers ─────────────────────────────────────────────────────────
function summarizeData(data: unknown): Record<string, unknown> | unknown {
if (!data || typeof data !== 'object') return data;
const src = data as Record<string, unknown>;
const out: Record<string, unknown> = {};
for (const k of Object.keys(src)) {
const v = src[k];
if (v == null) {
out[k] = v;
} else if (Array.isArray(v)) {
out[k] = `Array(${v.length})`;
if (k === 'uiMessages') {
out.uiMessagesPreview = v.slice(0, 5).map((m: any) => ({
id: (m.id ?? '').slice(-8),
role: m.role,
cLen: (m.content ?? '').length,
children: (m.children ?? []).length,
tools: (m.tools ?? []).length,
reasoning: (m.reasoning?.content ?? '').length,
}));
out.uiMessagesTotal = v.length;
}
} else if (typeof v === 'object') {
const obj = v as Record<string, unknown>;
out[k] =
'Object{' +
Object.keys(obj)
.slice(0, 6)
.map((kk) => kk + (typeof obj[kk] === 'string' ? `=${(obj[kk] as string).length}ch` : ''))
.join(',') +
'}';
} else if (typeof v === 'string') {
out[k] = v.length > 100 ? v.slice(0, 100) + `…(${v.length})` : v;
} else {
out[k] = v;
}
}
return out;
}
function summarizeMessages(msgs: any[]): ProbeMessageSummary[] {
return (msgs ?? []).slice(0, 80).map((m) => ({
id: (m.id ?? '').slice(-8),
role: m.role,
cLen: (m.content ?? '').length,
rLen: (m.reasoning?.content ?? '').length,
tools: (m.tools ?? []).length,
chN: (m.children ?? []).length,
}));
}
function shortStack(): string {
const raw = new Error('probe-stack').stack ?? '';
return raw
.split('\n')
.slice(3)
.filter((l) => !l.includes('probe-events') && !l.includes('node_modules'))
.map((l) => l.trim().replace(/^at\s+/, ''))
.slice(0, 6)
.join(' ← ');
}
function recordAgentEvent(args: {
transport: 'ws' | 'sse';
opId: string | null;
agentEvent: any;
eventId?: string | null;
rawLen?: number;
}): void {
const { transport, opId, agentEvent, eventId, rawLen } = args;
if (!agentEvent || typeof agentEvent !== 'object') return;
events.push({
t: now(),
transport,
opIdTail: (opId ?? '').slice(-10),
eventId: eventId ?? null,
type: agentEvent.type,
stepIndex: agentEvent.stepIndex,
dataKeys: agentEvent.data ? Object.keys(agentEvent.data) : [],
data: summarizeData(agentEvent.data) as Record<string, unknown>,
rawLen,
});
}
// ── 1. Patch window.WebSocket for gateway WS events ────────────────
if (!w.__PROBE_ORIG_WEBSOCKET) w.__PROBE_ORIG_WEBSOCKET = w.WebSocket;
const OrigWS = w.__PROBE_ORIG_WEBSOCKET;
function extractOpIdFromWsUrl(url: string | URL): string | null {
const m = String(url ?? '').match(/operationId=([^&]+)/);
return m ? decodeURIComponent(m[1]) : null;
}
function isGatewayWs(url: string | URL): boolean {
return String(url ?? '').includes('operationId=');
}
function handleWsFrame(rawData: unknown, opId: string | null): void {
const rawLen = typeof rawData === 'string' ? rawData.length : -1;
let parsed: any;
try {
parsed = typeof rawData === 'string' ? JSON.parse(rawData) : null;
} catch {
events.push({
t: now(),
transport: 'ws',
opIdTail: (opId ?? '').slice(-10),
type: '_PARSE_ERROR_',
raw: typeof rawData === 'string' && rawData.length < 400 ? rawData : '(non-string or large)',
});
return;
}
if (!parsed) return;
if (parsed.type === 'agent_event') {
recordAgentEvent({
transport: 'ws',
opId,
agentEvent: parsed.event,
eventId: parsed.id,
rawLen,
});
} else {
events.push({
t: now(),
transport: 'ws',
opIdTail: (opId ?? '').slice(-10),
type: '_SERVER_MSG_',
serverType: parsed.type,
rawLen,
});
}
}
// Wrap the constructor. Instance `constructor` will still reflect OrigWS
// (we share prototypes), so use the `_WS_OPEN_` sentinel events to confirm
// the patch is firing.
function PatchedWebSocket(this: WebSocket, url: string | URL, protocols?: string | string[]) {
const ws: WebSocket = protocols == null ? new OrigWS(url) : new OrigWS(url, protocols);
const opId = extractOpIdFromWsUrl(url);
if (!isGatewayWs(url)) return ws;
events.push({
t: now(),
transport: 'ws',
opIdTail: (opId ?? '').slice(-10),
type: '_WS_OPEN_',
url: String(url),
});
// One observer listener that always fires, regardless of how the consumer
// (AgentStreamClient uses `ws.onmessage = …`) subscribes.
ws.addEventListener('message', (e) => {
try {
handleWsFrame((e as MessageEvent).data, opId);
} catch {
/* swallow */
}
});
ws.addEventListener('close', () => {
events.push({
t: now(),
transport: 'ws',
opIdTail: (opId ?? '').slice(-10),
type: '_WS_CLOSE_',
});
});
return ws;
}
// Preserve prototype + static fields so `instanceof WebSocket` and
// `WebSocket.OPEN` constants still work.
(PatchedWebSocket as unknown as { prototype: WebSocket }).prototype = OrigWS.prototype;
for (const k of Object.keys(OrigWS) as Array<keyof typeof OrigWS>) {
try {
(PatchedWebSocket as any)[k] = (OrigWS as any)[k];
} catch {
/* readonly */
}
}
(['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'] as const).forEach((k) => {
(PatchedWebSocket as any)[k] = (OrigWS as any)[k];
});
w.WebSocket = PatchedWebSocket as unknown as typeof WebSocket;
// ── 2. Patch window.fetch for `/api/agent/stream` (direct-mode SSE) ─
if (!w.__PROBE_ORIG_FETCH) w.__PROBE_ORIG_FETCH = w.fetch.bind(w);
const origFetch = w.__PROBE_ORIG_FETCH;
function isAgentStreamUrl(input: RequestInfo | URL): boolean {
let url = '';
if (typeof input === 'string') url = input;
else if (input instanceof URL) url = input.toString();
else if (input && typeof (input as Request).url === 'string') url = (input as Request).url;
return url.includes('/api/agent/stream');
}
function extractOpIdFromHttpUrl(input: RequestInfo | URL): string | null {
const url = typeof input === 'string' ? input : (input as Request | URL).toString();
const m = url.match(/operationId=([^&]+)/);
return m ? decodeURIComponent(m[1]) : null;
}
function pushFromSSEFrame(rawFrame: string, opId: string | null): void {
const lines = rawFrame.split('\n');
let dataJson = '';
let evtName = 'message';
for (const line of lines) {
if (line.startsWith('event:')) evtName = line.slice(6).trim();
else if (line.startsWith('data:')) dataJson += line.slice(5).trim();
}
if (!dataJson) return;
let parsed: any;
try {
parsed = JSON.parse(dataJson);
} catch {
events.push({
t: now(),
transport: 'sse',
opIdTail: (opId ?? '').slice(-10),
type: '_PARSE_ERROR_',
sseEvent: evtName,
raw: dataJson.length > 400 ? dataJson.slice(0, 400) + '…' : dataJson,
});
return;
}
recordAgentEvent({
transport: 'sse',
opId,
agentEvent: parsed,
eventId: null,
rawLen: dataJson.length,
});
}
async function teeAndDrain(response: Response, opId: string | null): Promise<Response> {
if (!response.body) return response;
const [a, b] = response.body.tee();
void (async () => {
const reader = b.getReader();
const decoder = new TextDecoder();
let buf = '';
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const frame = buf.slice(0, idx);
buf = buf.slice(idx + 2);
if (frame.trim()) pushFromSSEFrame(frame, opId);
}
}
if (buf.trim()) pushFromSSEFrame(buf, opId);
} catch (e: any) {
events.push({
t: now(),
transport: 'sse',
opIdTail: (opId ?? '').slice(-10),
type: '_TEE_ERROR_',
message: String(e?.message ?? e),
});
}
})();
return new Response(a, {
headers: response.headers,
status: response.status,
statusText: response.statusText,
});
}
w.fetch = async function patchedFetch(input: RequestInfo | URL, init?: RequestInit) {
const response = await origFetch(input as any, init);
if (!isAgentStreamUrl(input)) return response;
const opId = extractOpIdFromHttpUrl(input);
const url =
typeof input === 'string'
? input.split('?')[0]
: (input as Request | URL).toString().split('?')[0];
events.push({
t: now(),
transport: 'sse',
opIdTail: (opId ?? '').slice(-10),
type: '_CONNECTED_',
url,
status: response.status,
});
return teeAndDrain(response, opId);
} as typeof fetch;
// ── 3. Wrap store actions (best-effort for "who called replace") ────
// Side-global stash for the original chat-store actions. Re-installs ALWAYS
// rewrap from the originals so updates to the probe body take effect
// without a page reload — using only a `__probeWrapped` flag on the chat
// state object would freeze the first-installed wrapper across re-installs.
declare global {
interface Window {
__PROBE_ORIG_REFRESH_MESSAGES?: any;
__PROBE_ORIG_REPLACE_MESSAGES?: any;
}
}
try {
const chat = w.__LOBE_STORES?.chat?.();
if (chat) {
// First-time install: cache the originals. Re-install: restore from
// the cached originals before wrapping again.
if (!w.__PROBE_ORIG_REFRESH_MESSAGES) w.__PROBE_ORIG_REFRESH_MESSAGES = chat.refreshMessages;
if (!w.__PROBE_ORIG_REPLACE_MESSAGES) w.__PROBE_ORIG_REPLACE_MESSAGES = chat.replaceMessages;
const origRefresh = w.__PROBE_ORIG_REFRESH_MESSAGES;
const origReplace = w.__PROBE_ORIG_REPLACE_MESSAGES;
chat.refreshMessages = origRefresh;
chat.replaceMessages = origReplace;
chat.refreshMessages = async function probeRefresh(this: unknown, ...args: any[]) {
calls.push({
t: now(),
name: 'refreshMessages',
args: { context: args[0] ?? null },
stack: shortStack(),
});
return origRefresh.apply(this, args);
};
chat.replaceMessages = function probeReplace(this: unknown, ...args: any[]) {
const msgs = (args[0] as any[]) ?? [];
const snapshot = msgs.slice(-2).map((m) => ({
id: (m.id ?? '').slice(-8),
role: m.role,
cLen: (m.content ?? '').length,
rLen: (m.reasoning?.content ?? '').length,
updatedAt: m.updatedAt,
}));
calls.push({
t: now(),
name: 'replaceMessages',
args: { count: msgs.length, params: args[1] ?? null, snapshot } as any,
stack: shortStack(),
});
// Pair the call with a mutation row so the analyzer can build a
// single ordered timeline across replaceMessages + dispatchMessage.
const stackTop = shortStack().split(' ← ')[0]?.slice(0, 80);
const last = msgs.at(-1);
const lastSum = last
? {
id: (last.id ?? '').slice(-8),
role: last.role,
cLen: (last.content ?? '').length,
rLen: (last.reasoning?.content ?? '').length,
updatedAt: last.updatedAt,
}
: undefined;
const params: any = args[1] ?? {};
const ctxKey = params.context
? `main_${params.context.agentId ?? '?'}_${
params.context.topicId ? 'tpc_' + params.context.topicId : 'new'
}`.replace('main_tpc_', 'main_') // crude key inference
: '(no-ctx)';
mutations.push({
t: now(),
key: ctxKey,
n: msgs.length,
last: lastSum,
delta: `replaceMessages(action=${params.action ?? '-'}) src=${stackTop ?? '-'}`,
});
return origReplace.apply(this, args);
};
}
} catch (e: any) {
calls.push({ t: now(), name: '_WRAP_ERROR_', error: String(e?.message ?? e) });
}
// ── 3.5. Mutation log — wrap the TWO ChatStore writers (replaceMessages,
// internal_dispatchMessage) to record EVERY dbMessagesMap[key] reference
// change with a one-line "before/after last assistant message" delta. This
// reveals dispatchMessage-driven collapses that the replaceMessages wrap
// alone cannot see.
declare global {
interface Window {
__PROBE_ORIG_DISPATCH_MESSAGE?: any;
}
}
try {
const chat = w.__LOBE_STORES?.chat?.();
if (chat?.internal_dispatchMessage) {
if (!w.__PROBE_ORIG_DISPATCH_MESSAGE)
w.__PROBE_ORIG_DISPATCH_MESSAGE = chat.internal_dispatchMessage;
const origDispatch = w.__PROBE_ORIG_DISPATCH_MESSAGE;
chat.internal_dispatchMessage = origDispatch;
chat.internal_dispatchMessage = function probeDispatch(this: unknown, payload: any, ctx?: any) {
// Snapshot BEFORE — read the would-be target key + last message.
const before = (() => {
try {
const state = w.__LOBE_STORES?.chat?.();
if (!state) return null;
// Replicate state.internal_getConversationContext logic enough to
// resolve a key — but most callers pass operationId on ctx, and
// operationId-keyed lookup needs store internals. Easiest: snapshot
// ALL keys' last-assistant cLen and compare BEFORE vs AFTER below.
const map = state.dbMessagesMap ?? {};
const out: Record<string, any> = {};
for (const k of Object.keys(map)) {
const last = (map[k] ?? []).at(-1);
out[k] = last
? {
id: (last.id ?? '').slice(-8),
cLen: (last.content ?? '').length,
rLen: (last.reasoning?.content ?? '').length,
n: map[k].length,
}
: { n: 0 };
}
return out;
} catch {
return null;
}
})();
const result = origDispatch.apply(this, [payload, ctx]);
// Snapshot AFTER — find which key(s) actually changed.
try {
const state = w.__LOBE_STORES?.chat?.();
if (state && before) {
const map = state.dbMessagesMap ?? {};
for (const k of Object.keys(map)) {
const last = (map[k] ?? []).at(-1);
const beforeSnap = before[k];
const afterSnap = last
? {
id: (last.id ?? '').slice(-8),
cLen: (last.content ?? '').length,
rLen: (last.reasoning?.content ?? '').length,
n: map[k].length,
}
: { n: 0 };
const changed =
!beforeSnap ||
beforeSnap.n !== afterSnap.n ||
beforeSnap.id !== (afterSnap as any).id ||
beforeSnap.cLen !== (afterSnap as any).cLen ||
beforeSnap.rLen !== (afterSnap as any).rLen;
if (!changed) continue;
let delta = '';
if (beforeSnap?.id !== undefined && beforeSnap.id !== (afterSnap as any).id)
delta += `id:${beforeSnap.id}${(afterSnap as any).id};`;
if (
beforeSnap?.cLen !== undefined &&
(afterSnap as any).cLen !== undefined &&
(afterSnap as any).cLen < beforeSnap.cLen
)
delta += `cLen↓${beforeSnap.cLen}${(afterSnap as any).cLen};`;
if (
beforeSnap?.rLen !== undefined &&
(afterSnap as any).rLen !== undefined &&
(afterSnap as any).rLen < beforeSnap.rLen
)
delta += `rLen↓${beforeSnap.rLen}${(afterSnap as any).rLen};`;
if (beforeSnap?.n !== undefined && afterSnap.n < beforeSnap.n)
delta += `n↓${beforeSnap.n}${afterSnap.n};`;
mutations.push({
t: now(),
key: k,
n: afterSnap.n,
last: (afterSnap as any).id ? (afterSnap as any) : undefined,
prevLast: beforeSnap?.id ? beforeSnap : undefined,
delta: delta || `dispatch:${payload?.type}`,
});
}
}
} catch (e: any) {
mutations.push({
t: now(),
key: '_DISPATCH_PROBE_ERROR_',
n: -1,
delta: String(e?.message ?? e),
});
}
return result;
};
}
} catch (e: any) {
calls.push({ t: now(), name: '_DISPATCH_WRAP_ERROR_', error: String(e?.message ?? e) });
}
// ── 4. Periodic per-key timeline snapshots ─────────────────────────
function captureTimeline(): void {
try {
const c = w.__LOBE_STORES?.chat?.();
if (!c) return;
const msgsMap = (c.messagesMap ?? {}) as Record<string, any[]>;
const dbMap = (c.dbMessagesMap ?? {}) as Record<string, any[]>;
const byKey: ProbeTimelineSample['byKey'] = {};
for (const k of Object.keys(msgsMap)) {
const display = msgsMap[k] ?? [];
const db = dbMap[k] ?? [];
if (display.length === 0 && db.length === 0) continue;
byKey[k] = {
n: display.length,
dbN: db.length,
msgs: summarizeMessages(display),
};
}
const ops = Object.values((c.operations ?? {}) as Record<string, any>);
timeline.push({
t: now(),
activeTopic: ((c.activeTopicId as string | null) ?? '').slice(-10) || null,
keys: Object.keys(byKey),
byKey,
runOps: ops.filter((o: any) => o.status === 'running').length,
});
} catch (e: any) {
timeline.push({
t: now(),
activeTopic: null,
keys: [],
byKey: {},
runOps: 0,
err: e?.message ?? String(e),
});
}
}
captureTimeline();
if (w.__PROBE_TIMELINE_TIMER) clearInterval(w.__PROBE_TIMELINE_TIMER);
w.__PROBE_TIMELINE_TIMER = setInterval(captureTimeline, 200);
// ── 5. Tab-switch helpers ──────────────────────────────────────────
function listTopBarTabs(): HTMLElement[] {
return Array.from(
document.querySelectorAll<HTMLElement>(
'[data-insp-path*="TabItem.tsx"][data-contextmenu-trigger]',
),
).filter((t) => t.getBoundingClientRect().top < 30);
}
w.__listTabs = () =>
listTopBarTabs().map((t, i) => ({
i,
key: t.getAttribute('data-contextmenu-trigger'),
active: t.getAttribute('data-active') === 'true',
title: (t.innerText ?? '').slice(0, 60),
}));
w.__clickTabByKey = (key: string) => {
const tab = listTopBarTabs().find((t) => t.getAttribute('data-contextmenu-trigger') === key);
if (!tab) return 'not found: ' + key;
if (tab.getAttribute('data-active') === 'true') return 'already active: ' + key;
tab.click();
return 'clicked key=' + key;
};
w.__PROBE_EVENT = (name: string) => {
calls.push({ t: now(), name: 'MARK:' + name });
};
// `run.ts` wraps the bundle in an IIFE and appends a `return <confirmation>`
// after the bundle body — agent-browser then prints the confirmation back to
// the operator. Nothing to do here at the end of the module body.
@@ -0,0 +1,211 @@
// CLI for the agent-gateway probe.
//
// Bundles the TS probes with esbuild, pipes them into `agent-browser eval`,
// and persists dumps under `.agent-gateway/` (gitignored) for later use as
// streaming-replay test fixtures.
//
// Commands:
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts install
// Bundle probe-events.ts and inject into the CDP-attached browser.
// Re-installing clears all buffers and re-patches WebSocket / fetch.
//
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts dump [name]
// Stop the timeline timer, fetch the capture as JSON, write it to
// `.agent-gateway/<name>-<YYYYMMDD-HHmmss>.json`. `name` defaults to
// `dump`. Prints the absolute path written.
//
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts analyze [path]
// Run analyze-events.ts on the dump. `path` defaults to the most
// recently modified file in `.agent-gateway/`.
//
// Optional flags:
// --cdp <port> CDP port (default 9222)
// --browser <bin> agent-browser binary (default 'agent-browser')
import { spawn } from 'node:child_process';
import { mkdirSync, readdirSync, statSync, writeFileSync } from 'node:fs';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
const SCRIPT_DIR = path.dirname(fileURLToPath(import.meta.url));
// .agents/skills/local-testing/scripts/agent-gateway/ → 5 levels up
const PROJECT_ROOT = path.resolve(SCRIPT_DIR, '../../../../..');
const DUMP_DIR = path.join(PROJECT_ROOT, '.agent-gateway');
interface Flags {
browser: string;
cdp: string;
positional: string[];
}
function parseFlags(argv: string[]): Flags {
const out: Flags = { cdp: '9222', browser: 'agent-browser', positional: [] };
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === '--cdp') out.cdp = argv[++i] ?? out.cdp;
else if (a === '--browser') out.browser = argv[++i] ?? out.browser;
else out.positional.push(a);
}
return out;
}
async function bundle(entry: string): Promise<string> {
// Bun.build is built into the Bun runtime — no external dep needed.
const r = await Bun.build({
entrypoints: [path.join(SCRIPT_DIR, entry)],
target: 'browser',
format: 'esm',
minify: false,
});
if (!r.success) {
const msgs = r.logs.map((l) => `${l.level}: ${l.message}`).join('\n');
throw new Error(`bundle failed for ${entry}:\n${msgs}`);
}
return await r.outputs[0].text();
}
function wrapIife(body: string, returnExpr: string): string {
// Wrap as an IIFE that swallows the bundled top-level (top-level `const`
// declarations get scoped to the IIFE, so re-injection doesn't conflict)
// and returns the configured expression — which `agent-browser eval`
// captures and prints to stdout.
return `(() => {\n${body}\n;return ${returnExpr};\n})()`;
}
function runAgentBrowserEval(flags: Flags, script: string): Promise<string> {
return new Promise((resolveP, rejectP) => {
const child = spawn(flags.browser, ['--cdp', flags.cdp, 'eval', '--stdin'], {
stdio: ['pipe', 'pipe', 'inherit'],
});
let stdout = '';
child.stdout.on('data', (chunk: Buffer) => {
stdout += chunk.toString('utf8');
});
child.on('error', rejectP);
child.on('close', (code) => {
if (code === 0) resolveP(stdout);
else rejectP(new Error(`agent-browser exited ${code}`));
});
child.stdin.write(script);
child.stdin.end();
});
}
// agent-browser prints eval results as JSON (string values are quoted).
function unquoteAgentBrowserResult(raw: string): string {
const trimmed = raw.trim();
if (trimmed.startsWith('"') && trimmed.endsWith('"')) {
try {
return JSON.parse(trimmed) as string;
} catch {
/* fall through */
}
}
return trimmed;
}
function isoStamp(): string {
const d = new Date();
const yyyy = d.getFullYear();
const mm = String(d.getMonth() + 1).padStart(2, '0');
const dd = String(d.getDate()).padStart(2, '0');
const hh = String(d.getHours()).padStart(2, '0');
const mi = String(d.getMinutes()).padStart(2, '0');
const ss = String(d.getSeconds()).padStart(2, '0');
return `${yyyy}${mm}${dd}-${hh}${mi}${ss}`;
}
function ensureDumpDir(): void {
mkdirSync(DUMP_DIR, { recursive: true });
}
function latestDump(): string | null {
ensureDumpDir();
const entries = readdirSync(DUMP_DIR)
.filter((f) => f.endsWith('.json'))
.map((f) => ({ f, mtime: statSync(path.join(DUMP_DIR, f)).mtimeMs }))
.sort((a, b) => b.mtime - a.mtime);
return entries[0] ? path.join(DUMP_DIR, entries[0].f) : null;
}
// ── Commands ────────────────────────────────────────────────────────
async function cmdInstall(flags: Flags): Promise<void> {
const body = await bundle('probe-events.ts');
const installMsg = JSON.stringify(
'events probe installed: WebSocket+fetch interception. ' +
'WS captures operationId= sockets (gateway), fetch captures /api/agent/stream (direct).',
);
const script = wrapIife(body, installMsg);
const out = await runAgentBrowserEval(flags, script);
console.log(unquoteAgentBrowserResult(out));
}
async function cmdDump(flags: Flags): Promise<void> {
const name = flags.positional[1] ?? 'dump';
const body = await bundle('probe-dump.ts');
const script = wrapIife(body, 'window.__PROBE_LAST_DUMP_JSON');
const raw = await runAgentBrowserEval(flags, script);
const json = unquoteAgentBrowserResult(raw);
ensureDumpDir();
const filename = `${name}-${isoStamp()}.json`;
const dumpPath = path.join(DUMP_DIR, filename);
writeFileSync(dumpPath, json, 'utf8');
// Validate by parsing the meta header so we error early on bad capture
try {
const parsed = JSON.parse(json) as {
meta?: { eventCount?: number; callCount?: number; sampleCount?: number };
};
const meta = parsed.meta ?? {};
console.log(
`wrote ${dumpPath} (${json.length} bytes events=${meta.eventCount ?? '?'} ` +
`calls=${meta.callCount ?? '?'} samples=${meta.sampleCount ?? '?'})`,
);
} catch {
console.log(`wrote ${dumpPath} (${json.length} bytes — JSON.parse failed; see file)`);
}
}
async function cmdAnalyze(flags: Flags): Promise<void> {
const target = flags.positional[1] ?? latestDump();
if (!target) {
console.error('no dump file found. run `dump` first or pass a path.');
process.exit(1);
}
const child = spawn('bun', ['run', path.join(SCRIPT_DIR, 'analyze-events.ts'), target], {
stdio: 'inherit',
});
await new Promise<void>((resolveP, rejectP) => {
child.on('error', rejectP);
child.on('close', (code) => (code === 0 ? resolveP() : rejectP(new Error(`exit ${code}`))));
});
}
// ── Entry point ─────────────────────────────────────────────────────
const flags = parseFlags(process.argv.slice(2));
const cmd = flags.positional[0];
const usage = `usage:
bun run run.ts install [--cdp 9222]
bun run run.ts dump [name] [--cdp 9222]
bun run run.ts analyze [path]
`;
if (!cmd) {
console.error(usage);
process.exit(1);
}
try {
if (cmd === 'install') await cmdInstall(flags);
else if (cmd === 'dump') await cmdDump(flags);
else if (cmd === 'analyze') await cmdAnalyze(flags);
else {
console.error(`unknown command: ${cmd}\n\n${usage}`);
process.exit(1);
}
} catch (e: any) {
console.error(e?.stack ?? e);
process.exit(1);
}
@@ -0,0 +1,113 @@
// Shared types between the in-browser probe and the Node-side analyzer.
// Kept tiny on purpose — anything the analyzer can re-derive is left off.
export interface ProbeStreamEvent {
/** Summarized payload — long strings truncated, arrays printed as Array(N) */
data?: Record<string, unknown>;
/** Keys present on the event's `data` payload — useful at a glance */
dataKeys?: string[];
/** ServerMessage.id — gateway WS frames carry an event-id we may resume from */
eventId?: string | null;
message?: string;
/** Last 10 chars of the operationId (full id is excessively long) */
opIdTail: string;
raw?: string;
/** Raw frame byte length, when applicable */
rawLen?: number;
/** For non-agent_event server frames (auth_success, heartbeat_ack, …) */
serverType?: string;
sseEvent?: string;
status?: number;
stepIndex?: number;
/** Milliseconds since the probe's t0 (install time). */
t: number;
/** 'ws' for gateway WebSocket frames, 'sse' for direct /api/agent/stream */
transport: 'ws' | 'sse';
/** Either the AgentStreamEvent.type, or a probe sentinel like `_WS_OPEN_` */
type: string;
url?: string;
}
export interface ProbeActionCall {
args?: {
count?: number;
context?: unknown;
params?: unknown;
};
error?: string;
/** `replaceMessages` / `refreshMessages` / `MARK:<label>` / `_WRAP_ERROR_` */
name: string;
stack?: string;
t: number;
}
export interface ProbeMessageSummary {
/** children.length */
chN: number;
/** content.length */
cLen: number;
/** Last 8 chars of the message id */
id: string;
/** reasoning.content.length */
rLen: number;
role: string;
/** tools.length */
tools: number;
}
export interface ProbeTimelineSample {
/** Last 10 chars of activeTopicId, or null */
activeTopic: string | null;
/** Per-key breakdown: display count, db count, message summaries */
byKey: Record<
string,
{
n: number;
dbN: number;
msgs: ProbeMessageSummary[];
}
>;
err?: string;
/** All messagesMap keys that have content at this moment */
keys: string[];
/** Number of operations in 'running' status */
runOps: number;
t: number;
}
export interface ProbeDumpMeta {
callCount: number;
/** Date.now() at dump call */
collectedAt: number;
eventCount: number;
sampleCount: number;
/** Date.now() at probe install */
t0: number;
}
export interface ProbeDump {
actionCalls: ProbeActionCall[];
meta: ProbeDumpMeta;
streamEvents: ProbeStreamEvent[];
timeline: ProbeTimelineSample[];
}
/**
* Globals the probe attaches to `window`. Keeps `as any` casts at the boundary
* instead of sprinkling them through the probe body.
*/
declare global {
interface Window {
__clickTabByKey?: (key: string) => string;
__listTabs?: () => Array<{ i: number; key: string | null; active: boolean; title: string }>;
__LOBE_STORES?: Record<string, () => any>;
__PROBE_ACTION_CALLS?: ProbeActionCall[];
__PROBE_EVENT?: (label: string) => void;
__PROBE_MSG_TIMELINE?: ProbeTimelineSample[];
__PROBE_ORIG_FETCH?: typeof fetch;
__PROBE_ORIG_WEBSOCKET?: typeof WebSocket;
__PROBE_STREAM_EVENTS?: ProbeStreamEvent[];
__PROBE_T0?: number;
__PROBE_TIMELINE_TIMER?: ReturnType<typeof setInterval> | null;
}
}