Compare commits

...

4 Commits

Author SHA1 Message Date
arvinxx 5e7b1b7822 🐛 fix: address review issues in AgentGatewayClient implementation
- Add IAgentGatewayClient interface for testability and decoupling
- Read gateway URL from server config instead of hardcoding
- Fix cleanup() destroying consumer event listeners on disconnect
- Wire tool confirmation and user input through WebSocket for gateway sessions
- Add gateway availability check (feature flag via agentGatewayUrl config)
- Add heartbeat ack timeout detection for dead connections
- Add maxReconnectAttempts to prevent infinite retry loops
- Log warning on malformed messages instead of silent ignore
- Add dispose() method for full cleanup vs disconnect preserving listeners
- Add comprehensive unit tests for AgentGatewayClient (32 tests)
- Add tsconfig.json and vitest.config.mts for the package
- Update agentGroup tests to match new gateway integration pattern

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:46:04 +08:00
arvinxx d27cb4e087 🔥 refactor: delete SSE agent stream infrastructure, update tests
- Delete /api/agent/stream SSE endpoint (never went to production)
- Delete AgentRuntimeClient SSE stream client
- Remove agentRuntimeClient export from services
- Update agentGroup tests to mock gateway client instead of SSE
- Add agentGatewayUrl to GlobalServerConfig type

LOBE-6537

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:46:04 +08:00
arvinxx c2dfda2f06 feat: integrate AgentGatewayClient into chat store
Replace SSE stream connection with WebSocket gateway in group agent execution.
Add internal_connectAgentGateway and gateway event handlers to runAgent slice.

LOBE-6537

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:46:04 +08:00
arvinxx 58a91a229f feat: add @lobechat/agent-gateway-client package
Browser WebSocket client for agent-gateway (Cloudflare Durable Object),
providing real-time bidirectional communication for server-side agent execution.

LOBE-6537

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 00:45:02 +08:00
17 changed files with 1585 additions and 1496 deletions
+1
View File
@@ -198,6 +198,7 @@
"@icons-pack/react-simple-icons": "^13.8.0",
"@khmyznikov/pwa-install": "0.3.9",
"@lexical/utils": "^0.42.0",
"@lobechat/agent-gateway-client": "workspace:*",
"@lobechat/agent-runtime": "workspace:*",
"@lobechat/agent-templates": "workspace:*",
"@lobechat/builtin-agent-onboarding": "workspace:*",
@@ -0,0 +1,16 @@
{
"name": "@lobechat/agent-gateway-client",
"version": "1.0.0",
"private": true,
"exports": {
".": "./src/index.ts"
},
"main": "./src/index.ts",
"scripts": {
"test": "bunx vitest run --silent='passed-only'",
"test:coverage": "bunx vitest run --coverage"
},
"devDependencies": {
"vitest": "^3.0.0"
}
}
@@ -0,0 +1,654 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { AgentGatewayClient } from '../client';
import type { ServerMessage } from '../types';
// ─── WebSocket Mock ───
class MockWebSocket {
static CONNECTING = 0;
static OPEN = 1;
static CLOSING = 2;
static CLOSED = 3;
readyState = MockWebSocket.CONNECTING;
url: string;
private eventListeners = new Map<string, Set<(...args: any[]) => void>>();
constructor(url: string) {
this.url = url;
// Simulate async open via microtask (not affected by fake timers)
Promise.resolve().then(() => {
if (this.readyState === MockWebSocket.CONNECTING) {
this.readyState = MockWebSocket.OPEN;
this.dispatchEvent('open');
}
});
}
addEventListener(event: string, listener: (...args: any[]) => void): void {
let set = this.eventListeners.get(event);
if (!set) {
set = new Set();
this.eventListeners.set(event, set);
}
set.add(listener);
}
removeEventListener(event: string, listener: (...args: any[]) => void): void {
this.eventListeners.get(event)?.delete(listener);
}
send = vi.fn();
close = vi.fn((code?: number, reason?: string) => {
this.readyState = MockWebSocket.CLOSED;
this.dispatchEvent('close', { code, reason });
});
// Test helpers
dispatchEvent(event: string, data?: any): void {
const set = this.eventListeners.get(event);
if (set) {
for (const listener of set) {
listener(data || {});
}
}
}
simulateMessage(message: ServerMessage): void {
this.dispatchEvent('message', { data: JSON.stringify(message) });
}
}
let mockWsInstances: MockWebSocket[] = [];
beforeEach(() => {
mockWsInstances = [];
vi.stubGlobal(
'WebSocket',
Object.assign(
class extends MockWebSocket {
constructor(url: string) {
super(url);
mockWsInstances.push(this);
}
},
{
CONNECTING: MockWebSocket.CONNECTING,
OPEN: MockWebSocket.OPEN,
CLOSING: MockWebSocket.CLOSING,
CLOSED: MockWebSocket.CLOSED,
},
),
);
});
afterEach(() => {
vi.restoreAllMocks();
vi.useRealTimers();
});
const createClient = (overrides?: Partial<ConstructorParameters<typeof AgentGatewayClient>[0]>) =>
new AgentGatewayClient({
gatewayUrl: 'https://gateway.example.com',
token: 'test-token',
...overrides,
});
const getLastWs = () => mockWsInstances.at(-1)!;
describe('AgentGatewayClient', () => {
describe('constructor', () => {
it('should initialize with disconnected status', () => {
const client = createClient();
expect(client.connectionStatus).toBe('disconnected');
});
});
describe('connect', () => {
it('should create WebSocket with correct URL', () => {
const client = createClient();
client.connect('test-chat-key');
const ws = getLastWs();
expect(ws.url).toBe('wss://gateway.example.com/ws?chatKey=test-chat-key');
});
it('should use ws:// for http:// gateway URLs', () => {
const client = createClient({ gatewayUrl: 'http://localhost:8080' });
client.connect('key');
const ws = getLastWs();
expect(ws.url).toBe('ws://localhost:8080/ws?chatKey=key');
});
it('should set status to connecting', () => {
const client = createClient();
client.connect('key');
expect(client.connectionStatus).toBe('connecting');
});
it('should not create multiple connections', () => {
const client = createClient();
client.connect('key');
client.connect('key'); // second call should be no-op
expect(mockWsInstances).toHaveLength(1);
});
it('should send auth message on open', async () => {
const client = createClient();
client.connect('key');
// Wait for async open
await vi.waitFor(() => {
expect(getLastWs().send).toHaveBeenCalledWith(
JSON.stringify({ token: 'test-token', type: 'auth' }),
);
});
});
it('should set status to authenticating on open', async () => {
const client = createClient();
client.connect('key');
await vi.waitFor(() => {
expect(client.connectionStatus).toBe('authenticating');
});
});
});
describe('authentication', () => {
it('should emit connected and set status on auth_success', async () => {
const client = createClient();
const connectedHandler = vi.fn();
client.on('connected', connectedHandler);
client.connect('key');
await vi.waitFor(() => {
expect(client.connectionStatus).toBe('authenticating');
});
getLastWs().simulateMessage({ type: 'auth_success' });
expect(client.connectionStatus).toBe('connected');
expect(connectedHandler).toHaveBeenCalledOnce();
});
it('should emit auth_failed and disconnect on auth_failed', async () => {
const client = createClient();
const authFailedHandler = vi.fn();
client.on('auth_failed', authFailedHandler);
client.connect('key');
await vi.waitFor(() => {
expect(client.connectionStatus).toBe('authenticating');
});
getLastWs().simulateMessage({ type: 'auth_failed', reason: 'Invalid token' });
expect(authFailedHandler).toHaveBeenCalledWith('Invalid token');
expect(client.connectionStatus).toBe('disconnected');
});
});
describe('disconnect', () => {
it('should close WebSocket and set status to disconnected', async () => {
const client = createClient();
client.connect('key');
await vi.waitFor(() => {
expect(client.connectionStatus).toBe('authenticating');
});
client.disconnect();
expect(client.connectionStatus).toBe('disconnected');
expect(getLastWs().close).toHaveBeenCalledWith(1000, 'Client disconnect');
});
it('should preserve event listeners after disconnect', async () => {
const client = createClient();
const handler = vi.fn();
client.on('connected', handler);
client.connect('key');
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
client.disconnect();
// Re-connect and verify handler still works
client.connect('key2');
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
getLastWs().simulateMessage({ type: 'auth_success' });
expect(handler).toHaveBeenCalledOnce();
});
});
describe('dispose', () => {
it('should disconnect and remove all listeners', async () => {
const client = createClient();
const handler = vi.fn();
client.on('connected', handler);
client.connect('key');
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
client.dispose();
expect(client.connectionStatus).toBe('disconnected');
});
});
describe('event handling', () => {
const setupConnectedClient = async () => {
const client = createClient();
client.connect('key');
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
getLastWs().simulateMessage({ type: 'auth_success' });
expect(client.connectionStatus).toBe('connected');
return client;
};
it('should emit agent_event', async () => {
const client = await setupConnectedClient();
const handler = vi.fn();
client.on('agent_event', handler);
const msg = {
type: 'agent_event' as const,
id: 'evt-1',
event: { kind: 'text_delta' as const, content: 'hello' },
};
getLastWs().simulateMessage(msg);
expect(handler).toHaveBeenCalledWith(msg);
});
it('should emit tool_confirmation_request', async () => {
const client = await setupConnectedClient();
const handler = vi.fn();
client.on('tool_confirmation_request', handler);
const msg = {
type: 'tool_confirmation_request' as const,
id: 'evt-2',
toolCallId: 'tc-1',
tool: { apiName: 'test', arguments: '{}', identifier: 'test', name: 'test' },
};
getLastWs().simulateMessage(msg);
expect(handler).toHaveBeenCalledWith(msg);
});
it('should emit input_request', async () => {
const client = await setupConnectedClient();
const handler = vi.fn();
client.on('input_request', handler);
const msg = {
type: 'input_request' as const,
id: 'evt-3',
requestId: 'req-1',
prompt: 'Enter value',
};
getLastWs().simulateMessage(msg);
expect(handler).toHaveBeenCalledWith(msg);
});
it('should emit session_complete', async () => {
const client = await setupConnectedClient();
const handler = vi.fn();
client.on('session_complete', handler);
const msg = {
type: 'session_complete' as const,
id: 'evt-4',
summary: 'Done',
};
getLastWs().simulateMessage(msg);
expect(handler).toHaveBeenCalledWith(msg);
});
it('should emit error for error messages', async () => {
const client = await setupConnectedClient();
const handler = vi.fn();
client.on('error', handler);
getLastWs().simulateMessage({
type: 'error',
id: 'evt-5',
code: 'ERR',
message: 'Something failed',
});
expect(handler).toHaveBeenCalledWith(expect.any(Error));
expect((handler.mock.calls[0][0] as Error).message).toBe('Something failed');
});
it('should warn on malformed messages instead of silently ignoring', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
const client = await setupConnectedClient();
getLastWs().dispatchEvent('message', { data: 'not json' });
expect(warnSpy).toHaveBeenCalledWith(
'[AgentGatewayClient] Failed to parse message:',
expect.any(Error),
);
});
});
describe('client commands', () => {
it('should send interrupt message', async () => {
const client = createClient();
client.connect('key');
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
// Make ws OPEN
const ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.send.mockClear();
client.sendInterrupt();
expect(ws.send).toHaveBeenCalledWith(JSON.stringify({ type: 'interrupt' }));
});
it('should send tool confirmation', async () => {
const client = createClient();
client.connect('key');
const ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.send.mockClear();
client.sendToolConfirmation('tc-1', true);
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({ approved: true, toolCallId: 'tc-1', type: 'tool_confirmation' }),
);
});
it('should send user input', async () => {
const client = createClient();
client.connect('key');
const ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.send.mockClear();
client.sendUserInput('req-1', 'user response');
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({ content: 'user response', requestId: 'req-1', type: 'user_input' }),
);
});
it('should not throw when sending on closed socket', () => {
// Don't connect — socket is null
expect(() => createClient().sendInterrupt()).not.toThrow();
});
});
describe('heartbeat', () => {
it('should send heartbeat after auth_success', async () => {
vi.useFakeTimers();
const client = createClient();
client.connect('key');
const ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
ws.simulateMessage({ type: 'auth_success' });
ws.send.mockClear();
// Advance past heartbeat interval
vi.advanceTimersByTime(30_000);
expect(ws.send).toHaveBeenCalledWith(JSON.stringify({ type: 'heartbeat' }));
});
it('should close connection on heartbeat ack timeout', async () => {
vi.useFakeTimers();
const client = createClient();
client.connect('key');
const ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
ws.simulateMessage({ type: 'auth_success' });
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
// Send heartbeat
vi.advanceTimersByTime(30_000);
// Don't send ack — wait for timeout
vi.advanceTimersByTime(10_000);
expect(ws.close).toHaveBeenCalledWith(4000, 'Heartbeat timeout');
warnSpy.mockRestore();
});
it('should clear ack timer on heartbeat_ack', async () => {
vi.useFakeTimers();
const client = createClient();
client.connect('key');
const ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
ws.simulateMessage({ type: 'auth_success' });
// Send heartbeat
vi.advanceTimersByTime(30_000);
// Ack received
ws.simulateMessage({ type: 'heartbeat_ack' });
// Advance past timeout — should NOT close
vi.advanceTimersByTime(10_000);
// close should not have been called (only from our test, not from timeout)
expect(ws.close).not.toHaveBeenCalled();
});
});
describe('reconnection', () => {
it('should reconnect on unexpected close', async () => {
vi.useFakeTimers();
const client = createClient();
const reconnectHandler = vi.fn();
client.on('reconnecting', reconnectHandler);
client.connect('key');
const ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
ws.simulateMessage({ type: 'auth_success' });
// Simulate unexpected close
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
expect(client.connectionStatus).toBe('reconnecting');
expect(reconnectHandler).toHaveBeenCalledWith(1000); // initial delay
// Advance to trigger reconnect
vi.advanceTimersByTime(1000);
expect(mockWsInstances).toHaveLength(2);
});
it('should use exponential backoff', async () => {
vi.useFakeTimers();
const client = createClient({ maxReconnectAttempts: 0 }); // unlimited
const delays: number[] = [];
client.on('reconnecting', (delay: number) => delays.push(delay));
client.connect('key');
// First WS opens, auth succeeds, then closes unexpectedly
let ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
ws.simulateMessage({ type: 'auth_success' });
// Close triggers reconnect with delay[0]
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
vi.advanceTimersByTime(delays.at(-1)!);
// Second WS - close again
ws = getLastWs();
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
vi.advanceTimersByTime(delays.at(-1)!);
// Third WS - close again
ws = getLastWs();
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
vi.advanceTimersByTime(delays.at(-1)!);
// Fourth WS - close again
ws = getLastWs();
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
expect(delays).toEqual([1000, 2000, 4000, 8000]);
});
it('should stop after max reconnect attempts', async () => {
vi.useFakeTimers();
const client = createClient({ maxReconnectAttempts: 2 });
const disconnectedHandler = vi.fn();
const errorHandler = vi.fn();
client.on('disconnected', disconnectedHandler);
client.on('error', errorHandler);
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
client.connect('key');
// First WS opens and then closes
let ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
ws.simulateMessage({ type: 'auth_success' });
// Close -> reconnect attempt 1
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
vi.advanceTimersByTime(1000);
// Second WS close -> reconnect attempt 2
ws = getLastWs();
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
vi.advanceTimersByTime(2000);
// Third WS close -> attempt 3, exceeds max (2)
ws = getLastWs();
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
// Should stop reconnecting
expect(client.connectionStatus).toBe('disconnected');
expect(disconnectedHandler).toHaveBeenCalled();
expect(errorHandler).toHaveBeenCalledWith(
expect.objectContaining({ message: 'Max reconnect attempts reached' }),
);
warnSpy.mockRestore();
});
it('should not reconnect on intentional disconnect', async () => {
vi.useFakeTimers();
const client = createClient();
client.connect('key');
const ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
client.disconnect();
// Should not create new connections
vi.advanceTimersByTime(60_000);
expect(mockWsInstances).toHaveLength(1);
});
it('should not reconnect when autoReconnect is false', async () => {
const client = createClient({ autoReconnect: false });
const disconnectedHandler = vi.fn();
client.on('disconnected', disconnectedHandler);
client.connect('key');
const ws = getLastWs();
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
expect(client.connectionStatus).toBe('disconnected');
expect(disconnectedHandler).toHaveBeenCalled();
expect(mockWsInstances).toHaveLength(1);
});
});
describe('resume', () => {
it('should send resume with lastEventId on reconnect', async () => {
vi.useFakeTimers();
const client = createClient();
client.connect('key');
let ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
ws.simulateMessage({ type: 'auth_success' });
// Receive an event to set lastEventId
ws.simulateMessage({
type: 'agent_event',
id: 'evt-99',
event: { kind: 'text_delta', content: 'hi' },
});
// Simulate disconnect and reconnect
ws.readyState = MockWebSocket.CLOSED;
ws.dispatchEvent('close');
vi.advanceTimersByTime(1000);
ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
ws.simulateMessage({ type: 'auth_success' });
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({ lastEventId: 'evt-99', type: 'resume' }),
);
});
});
describe('updateToken', () => {
it('should use updated token on next auth', async () => {
vi.useFakeTimers();
const client = createClient({ token: 'old-token' });
client.connect('key');
let ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
// First auth uses old token
expect(ws.send).toHaveBeenCalledWith(JSON.stringify({ token: 'old-token', type: 'auth' }));
// Update token and reconnect
client.updateToken('new-token');
ws.simulateMessage({ type: 'auth_failed', reason: 'expired' });
// Reconnect with new token
client.connect('key');
ws = getLastWs();
ws.readyState = MockWebSocket.OPEN;
ws.dispatchEvent('open');
expect(ws.send).toHaveBeenCalledWith(JSON.stringify({ token: 'new-token', type: 'auth' }));
});
});
});
+407
View File
@@ -0,0 +1,407 @@
import type {
AgentEventMessage,
AgentGatewayClientEvents,
ClientMessage,
ConnectionStatus,
ErrorMessage,
IAgentGatewayClient,
InputRequestMessage,
ServerMessage,
SessionCompleteMessage,
StatusChangeMessage,
ToolConfirmationRequestMessage,
} from './types';
// ─── Constants ───
const HEARTBEAT_INTERVAL = 30_000; // 30s
const HEARTBEAT_TIMEOUT = 10_000; // 10s — if no ack within this window, consider dead
const INITIAL_RECONNECT_DELAY = 1000; // 1s
const MAX_RECONNECT_DELAY = 30_000; // 30s
const DEFAULT_MAX_RECONNECT_ATTEMPTS = 10;
// ─── Minimal EventEmitter for Browser ───
type Listener = (...args: any[]) => void;
class BrowserEventEmitter {
private listeners = new Map<string, Set<Listener>>();
on(event: string, listener: Listener): void {
let set = this.listeners.get(event);
if (!set) {
set = new Set();
this.listeners.set(event, set);
}
set.add(listener);
}
off(event: string, listener: Listener): void {
this.listeners.get(event)?.delete(listener);
}
emit(event: string, ...args: any[]): void {
const set = this.listeners.get(event);
if (set) {
for (const listener of set) {
listener(...args);
}
}
}
removeAllListeners(): void {
this.listeners.clear();
}
}
// ─── Client Options ───
export interface AgentGatewayClientOptions {
/** Auto-reconnect on disconnection (default: true) */
autoReconnect?: boolean;
/** Gateway base URL (e.g. https://agent-gateway.lobehub.com) */
gatewayUrl: string;
/** Max reconnect attempts before giving up (default: 10, 0 = unlimited) */
maxReconnectAttempts?: number;
/** JWT token for authentication */
token: string;
}
// ─── Agent Gateway Client ───
export class AgentGatewayClient implements IAgentGatewayClient {
private ws: WebSocket | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private heartbeatAckTimer: ReturnType<typeof setTimeout> | null = null;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private reconnectDelay = INITIAL_RECONNECT_DELAY;
private reconnectAttempts = 0;
private status: ConnectionStatus = 'disconnected';
private intentionalDisconnect = false;
private chatKey = '';
private lastEventId: string | undefined;
private gatewayUrl: string;
private token: string;
private autoReconnect: boolean;
private maxReconnectAttempts: number;
private emitter = new BrowserEventEmitter();
constructor(options: AgentGatewayClientOptions) {
this.gatewayUrl = options.gatewayUrl;
this.token = options.token;
this.autoReconnect = options.autoReconnect ?? true;
this.maxReconnectAttempts = options.maxReconnectAttempts ?? DEFAULT_MAX_RECONNECT_ATTEMPTS;
}
// ─── Public API ───
get connectionStatus(): ConnectionStatus {
return this.status;
}
on<K extends keyof AgentGatewayClientEvents>(
event: K,
listener: AgentGatewayClientEvents[K],
): void {
this.emitter.on(event, listener as Listener);
}
off<K extends keyof AgentGatewayClientEvents>(
event: K,
listener: AgentGatewayClientEvents[K],
): void {
this.emitter.off(event, listener as Listener);
}
/**
* Connect to a specific agent session.
*/
connect(chatKey: string): void {
if (this.status === 'connected' || this.status === 'connecting') return;
this.chatKey = chatKey;
this.intentionalDisconnect = false;
this.reconnectAttempts = 0;
this.doConnect();
}
disconnect(): void {
this.intentionalDisconnect = true;
this.cleanupConnection();
this.setStatus('disconnected');
}
/**
* Update the auth token (e.g. after refresh).
*/
updateToken(token: string): void {
this.token = token;
}
// ─── Client → Server Commands ───
sendInterrupt(): void {
this.send({ type: 'interrupt' });
}
sendToolConfirmation(toolCallId: string, approved: boolean): void {
this.send({ approved, toolCallId, type: 'tool_confirmation' });
}
sendUserInput(requestId: string, content: string): void {
this.send({ content, requestId, type: 'user_input' });
}
// ─── Connection Logic ───
private doConnect(): void {
this.clearReconnectTimer();
this.setStatus('connecting');
try {
const wsUrl = this.buildWsUrl();
const ws = new WebSocket(wsUrl);
ws.addEventListener('open', this.handleOpen);
ws.addEventListener('message', this.handleMessage);
ws.addEventListener('close', this.handleClose);
ws.addEventListener('error', this.handleError);
this.ws = ws;
} catch {
this.setStatus('disconnected');
if (this.autoReconnect) {
this.scheduleReconnect();
} else {
this.emitter.emit('disconnected');
}
}
}
private buildWsUrl(): string {
const wsProtocol = this.gatewayUrl.startsWith('https') ? 'wss' : 'ws';
const host = this.gatewayUrl.replace(/^https?:\/\//, '');
const params = new URLSearchParams({ chatKey: this.chatKey });
return `${wsProtocol}://${host}/ws?${params.toString()}`;
}
// ─── WebSocket Event Handlers ───
private handleOpen = (): void => {
this.reconnectDelay = INITIAL_RECONNECT_DELAY;
this.reconnectAttempts = 0;
this.setStatus('authenticating');
// Authenticate
this.send({ token: this.token, type: 'auth' });
};
private handleMessage = (ev: MessageEvent): void => {
try {
const message = JSON.parse(ev.data as string) as ServerMessage;
switch (message.type) {
case 'auth_success': {
this.setStatus('connected');
this.startHeartbeat();
this.emitter.emit('connected');
// Resume from last event if reconnecting
if (this.lastEventId) {
this.send({ lastEventId: this.lastEventId, type: 'resume' });
}
break;
}
case 'auth_failed': {
this.emitter.emit('auth_failed', message.reason);
this.disconnect();
break;
}
case 'heartbeat_ack': {
this.clearHeartbeatAckTimer();
break;
}
case 'agent_event': {
this.lastEventId = message.id;
this.emitter.emit('agent_event', message as AgentEventMessage);
break;
}
case 'status_change': {
this.lastEventId = message.id;
this.emitter.emit('status_update', message as StatusChangeMessage);
break;
}
case 'tool_confirmation_request': {
this.lastEventId = message.id;
this.emitter.emit('tool_confirmation_request', message as ToolConfirmationRequestMessage);
break;
}
case 'input_request': {
this.lastEventId = message.id;
this.emitter.emit('input_request', message as InputRequestMessage);
break;
}
case 'session_complete': {
this.lastEventId = message.id;
this.emitter.emit('session_complete', message as SessionCompleteMessage);
break;
}
case 'error': {
this.lastEventId = (message as ErrorMessage).id;
this.emitter.emit('error', new Error((message as ErrorMessage).message));
break;
}
}
} catch (error) {
console.warn('[AgentGatewayClient] Failed to parse message:', error);
}
};
private handleClose = (): void => {
this.stopHeartbeat();
this.ws = null;
if (!this.intentionalDisconnect && this.autoReconnect) {
this.setStatus('reconnecting');
this.scheduleReconnect();
} else {
this.setStatus('disconnected');
this.emitter.emit('disconnected');
}
};
private handleError = (ev: Event): void => {
this.emitter.emit('error', ev);
};
// ─── Heartbeat ───
private startHeartbeat(): void {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
this.send({ type: 'heartbeat' });
this.startHeartbeatAckTimer();
}, HEARTBEAT_INTERVAL);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
this.clearHeartbeatAckTimer();
}
private startHeartbeatAckTimer(): void {
this.clearHeartbeatAckTimer();
this.heartbeatAckTimer = setTimeout(() => {
// No ack received — connection is likely dead
console.warn('[AgentGatewayClient] Heartbeat ack timeout, closing connection');
this.ws?.close(4000, 'Heartbeat timeout');
}, HEARTBEAT_TIMEOUT);
}
private clearHeartbeatAckTimer(): void {
if (this.heartbeatAckTimer) {
clearTimeout(this.heartbeatAckTimer);
this.heartbeatAckTimer = null;
}
}
// ─── Reconnection (exponential backoff) ───
private scheduleReconnect(): void {
// Check max attempts (0 = unlimited)
if (this.maxReconnectAttempts > 0 && this.reconnectAttempts >= this.maxReconnectAttempts) {
console.warn(
`[AgentGatewayClient] Max reconnect attempts (${this.maxReconnectAttempts}) reached`,
);
this.setStatus('disconnected');
this.emitter.emit('disconnected');
this.emitter.emit('error', new Error('Max reconnect attempts reached'));
return;
}
this.clearReconnectTimer();
this.reconnectAttempts++;
const delay = this.reconnectDelay;
this.emitter.emit('reconnecting', delay);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.doConnect();
}, delay);
// Exponential backoff: 1s → 2s → 4s → 8s → ... → 30s
this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY);
}
private clearReconnectTimer(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
// ─── Status ───
private setStatus(status: ConnectionStatus): void {
if (this.status === status) return;
this.status = status;
this.emitter.emit('status_changed', status);
}
// ─── Helpers ───
/**
* Send a message to the gateway. Returns false if the socket is not open.
*/
private send(data: ClientMessage): boolean {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
return true;
}
return false;
}
/**
* Clean up WebSocket connection and timers, but preserve emitter listeners.
* Emitter listeners are registered by the consumer and should survive reconnects.
*/
private cleanupConnection(): void {
this.stopHeartbeat();
this.clearReconnectTimer();
if (this.ws) {
this.ws.removeEventListener('open', this.handleOpen);
this.ws.removeEventListener('message', this.handleMessage);
this.ws.removeEventListener('close', this.handleClose);
this.ws.removeEventListener('error', this.handleError);
if (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING) {
this.ws.close(1000, 'Client disconnect');
}
this.ws = null;
}
}
/**
* Full cleanup including emitter listeners. Call only when the client is being disposed.
*/
dispose(): void {
this.disconnect();
this.emitter.removeAllListeners();
}
}
@@ -0,0 +1,4 @@
export type { AgentGatewayClientOptions } from './client';
export { AgentGatewayClient } from './client';
export type { IAgentGatewayClient } from './types';
export * from './types';
+176
View File
@@ -0,0 +1,176 @@
// ─── Session State ───
export type SessionStatus =
| 'completed'
| 'error'
| 'interrupted'
| 'running'
| 'waiting_confirmation'
| 'waiting_input';
// ─── Agent Stream Events (pushed from backend via Gateway) ───
export type AgentStreamEvent =
| { content: string; kind: 'text_delta' }
| { content: string; kind: 'thinking' }
| { kind: 'message_complete'; messageId: string }
| { kind: 'step_complete'; stepIndex: number }
| { arguments: string; kind: 'tool_call_delta'; toolCallId: string }
| { kind: 'tool_call_end'; toolCallId: string }
| { arguments: string; kind: 'tool_call_start'; name: string; toolCallId: string }
| { kind: 'tool_result'; result: string; toolCallId: string };
export interface ToolCallInfo {
apiName: string;
arguments: string;
identifier: string;
name: string;
}
// ─── Client → Server Messages (Browser → Gateway) ───
export interface AuthMessage {
token: string;
type: 'auth';
}
export interface HeartbeatMessage {
type: 'heartbeat';
}
export interface InterruptMessage {
type: 'interrupt';
}
export interface ToolConfirmationMessage {
approved: boolean;
toolCallId: string;
type: 'tool_confirmation';
}
export interface UserInputMessage {
content: string;
requestId: string;
type: 'user_input';
}
export interface ResumeMessage {
lastEventId: string;
type: 'resume';
}
export type ClientMessage =
| AuthMessage
| HeartbeatMessage
| InterruptMessage
| ResumeMessage
| ToolConfirmationMessage
| UserInputMessage;
// ─── Server → Client Messages (Gateway → Browser) ───
export interface AuthSuccessMessage {
type: 'auth_success';
}
export interface AuthFailedMessage {
reason: string;
type: 'auth_failed';
}
export interface HeartbeatAckMessage {
type: 'heartbeat_ack';
}
export interface AgentEventMessage {
event: AgentStreamEvent;
id: string;
type: 'agent_event';
}
export interface StatusChangeMessage {
id: string;
status: SessionStatus;
type: 'status_change';
}
export interface ToolConfirmationRequestMessage {
id: string;
tool: ToolCallInfo;
toolCallId: string;
type: 'tool_confirmation_request';
}
export interface InputRequestMessage {
id: string;
prompt: string;
requestId: string;
type: 'input_request';
}
export interface SessionCompleteMessage {
id: string;
summary?: string;
type: 'session_complete';
}
export interface ErrorMessage {
code: string;
id: string;
message: string;
type: 'error';
}
export type ServerMessage =
| AgentEventMessage
| AuthFailedMessage
| AuthSuccessMessage
| ErrorMessage
| HeartbeatAckMessage
| InputRequestMessage
| SessionCompleteMessage
| StatusChangeMessage
| ToolConfirmationRequestMessage;
// ─── Client Types ───
export type ConnectionStatus =
| 'authenticating'
| 'connected'
| 'connecting'
| 'disconnected'
| 'reconnecting';
export interface AgentGatewayClientEvents {
agent_event: (message: AgentEventMessage) => void;
auth_failed: (reason: string) => void;
connected: () => void;
disconnected: () => void;
error: (error: Event | Error) => void;
input_request: (message: InputRequestMessage) => void;
reconnecting: (delay: number) => void;
session_complete: (message: SessionCompleteMessage) => void;
status_changed: (status: ConnectionStatus) => void;
status_update: (message: StatusChangeMessage) => void;
tool_confirmation_request: (message: ToolConfirmationRequestMessage) => void;
}
// ─── Client Interface ───
export interface IAgentGatewayClient {
connect: (chatKey: string) => void;
readonly connectionStatus: ConnectionStatus;
disconnect: () => void;
off: <K extends keyof AgentGatewayClientEvents>(
event: K,
listener: AgentGatewayClientEvents[K],
) => void;
on: <K extends keyof AgentGatewayClientEvents>(
event: K,
listener: AgentGatewayClientEvents[K],
) => void;
sendInterrupt: () => void;
sendToolConfirmation: (toolCallId: string, approved: boolean) => void;
sendUserInput: (requestId: string, content: string) => void;
updateToken: (token: string) => void;
}
@@ -0,0 +1,4 @@
{
"extends": "../../tsconfig.json",
"include": ["src/"]
}
@@ -0,0 +1,11 @@
import { defineConfig } from 'vitest/config';
export default defineConfig({
test: {
coverage: {
all: false,
reporter: ['text', 'json', 'lcov', 'text-summary'],
},
environment: 'node',
},
});
+1
View File
@@ -47,6 +47,7 @@ export interface ServerModelProviderConfig {
export type ServerLanguageModel = Partial<Record<GlobalLLMProviderKey, ServerModelProviderConfig>>;
export interface GlobalServerConfig {
agentGatewayUrl?: string;
aiProvider: ServerLanguageModel;
defaultAgent?: PartialDeep<UserDefaultAgent>;
disableEmailPassword?: boolean;
@@ -1,857 +0,0 @@
// @vitest-environment node
import { NextRequest } from 'next/server';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { GET } from '../route';
// Mock dependencies first
const mockStreamEventManager = {
getStreamHistory: vi.fn(),
subscribeStreamEvents: vi.fn(),
};
vi.mock('@/server/modules/AgentRuntime', () => ({
createStreamEventManager: vi.fn(() => mockStreamEventManager),
}));
describe('/api/agent/stream route', () => {
const MOCK_TIMESTAMP = 1758203237000;
beforeEach(() => {
vi.resetAllMocks();
// Mock Date.now to return consistent timestamp
vi.spyOn(Date, 'now').mockReturnValue(MOCK_TIMESTAMP);
});
afterEach(() => {
vi.clearAllMocks();
});
describe('GET handler', () => {
it('should return 400 when operationId parameter is missing', async () => {
const request = new NextRequest('https://test.com/api/agent/stream');
const response = await GET(request);
expect(response.status).toBe(400);
const data = await response.json();
expect(data.error).toBe('operationId parameter is required');
});
it('should return SSE stream with correct headers when operationId is provided', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
const response = await GET(request);
expect(response.status).toBe(200);
expect(response.headers.get('Content-Type')).toBe('text/event-stream');
expect(response.headers.get('Cache-Control')).toBe('no-cache, no-transform');
expect(response.headers.get('Connection')).toBe('keep-alive');
expect(response.headers.get('Access-Control-Allow-Origin')).toBe('*');
expect(response.headers.get('Access-Control-Allow-Methods')).toBe('GET');
expect(response.headers.get('Access-Control-Allow-Headers')).toBe(
'Cache-Control, Last-Event-ID',
);
expect(response.headers.get('X-Accel-Buffering')).toBe('no');
});
});
describe('Stream functionality with exact data verification', () => {
it('should send connection event in exact SSE format', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation&lastEventId=123',
);
const response = await GET(request);
const decoder = new TextDecoder();
const reader = response.body!.getReader();
// Collect all chunks
const chunks = [];
let readCount = 0;
const maxReads = 1; // Only read connection event
try {
while (readCount < maxReads) {
const readPromise = reader.read();
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Read timeout')), 1000),
);
const result = (await Promise.race([
readPromise,
timeoutPromise,
])) as ReadableStreamReadResult<Uint8Array>;
if (result.done) break;
if (result.value) {
const chunk =
result.value instanceof Uint8Array
? decoder.decode(result.value)
: String(result.value);
chunks.push(chunk);
readCount++;
}
}
} catch {
// Timeout or error
} finally {
reader.releaseLock();
}
// Verify exact stream format with mocked timestamp (new SSE format)
expect(chunks).toEqual([
`id: conn_${MOCK_TIMESTAMP}\nevent: connected\ndata: {"lastEventId":"123","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}\n\n`,
]);
});
it('should verify getStreamHistory with exact historical events format', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation&includeHistory=true&lastEventId=100',
);
// Mock getStreamHistory to return specific events
const mockEvents = [
{
type: 'stream_end',
timestamp: 300,
operationId: 'test-operation',
data: { messageId: 'msg3' },
},
{
type: 'stream_chunk',
timestamp: 250,
operationId: 'test-operation',
data: { content: 'world' },
},
{
type: 'stream_start',
timestamp: 150,
operationId: 'test-operation',
data: { messageId: 'msg1' },
},
];
mockStreamEventManager.getStreamHistory.mockResolvedValue(mockEvents);
const response = await GET(request);
const decoder = new TextDecoder();
const reader = response.body!.getReader();
// Collect all chunks
const chunks = [];
let readCount = 0;
const maxReads = 3; // connection + 2 filtered historical events (timestamp > 100)
try {
while (readCount < maxReads) {
const readPromise = reader.read();
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Read timeout')), 500),
);
const result = (await Promise.race([
readPromise,
timeoutPromise,
])) as ReadableStreamReadResult<Uint8Array>;
if (result.done) break;
if (result.value) {
const chunk =
result.value instanceof Uint8Array
? decoder.decode(result.value)
: String(result.value);
chunks.push(chunk);
readCount++;
}
}
} catch {
// Timeout or error
} finally {
reader.releaseLock();
}
// Verify exact stream format - connection event + filtered historical events (new SSE format)
expect(chunks).toEqual([
`id: conn_${MOCK_TIMESTAMP}\nevent: connected\ndata: {"lastEventId":"100","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}\n\n`,
`id: test-operation\nevent: stream_start\ndata: {"type":"stream_start","timestamp":150,"operationId":"test-operation","data":{"messageId":"msg1"}}\n\n`,
`id: test-operation\nevent: stream_chunk\ndata: {"type":"stream_chunk","timestamp":250,"operationId":"test-operation","data":{"content":"world"}}\n\n`,
]);
// Verify API calls
expect(mockStreamEventManager.getStreamHistory).toHaveBeenCalledWith('test-operation', 50);
});
it('should verify event filtering with exact format', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation&includeHistory=true&lastEventId=200',
);
// Mock events where some should be filtered out
const mockEvents = [
{
type: 'stream_end',
timestamp: 300,
operationId: 'test-operation',
data: { messageId: 'msg3' },
}, // Should be included (300 > 200)
{
type: 'stream_chunk',
timestamp: 250,
operationId: 'test-operation',
data: { content: 'world' },
}, // Should be included (250 > 200)
{
type: 'stream_chunk',
timestamp: 200,
operationId: 'test-operation',
data: { content: 'hello' },
}, // Should be excluded (200 = 200)
{
type: 'stream_start',
timestamp: 150,
operationId: 'test-operation',
data: { messageId: 'msg1' },
}, // Should be excluded (150 < 200)
];
mockStreamEventManager.getStreamHistory.mockResolvedValue(mockEvents);
const response = await GET(request);
const decoder = new TextDecoder();
const reader = response.body!.getReader();
// Collect all chunks
const chunks = [];
let readCount = 0;
const maxReads = 3; // connection + 2 filtered events
try {
while (readCount < maxReads) {
const readPromise = reader.read();
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Read timeout')), 500),
);
const result = (await Promise.race([
readPromise,
timeoutPromise,
])) as ReadableStreamReadResult<Uint8Array>;
if (result.done) break;
if (result.value) {
const chunk =
result.value instanceof Uint8Array
? decoder.decode(result.value)
: String(result.value);
chunks.push(chunk);
readCount++;
}
}
} catch {
// Timeout or error
} finally {
reader.releaseLock();
}
// Verify exact stream format - only events with timestamp > 200 are included (new SSE format)
// Note: indices are based on original array position, not filtered position
expect(chunks).toEqual([
`id: conn_${MOCK_TIMESTAMP}
event: connected
data: {"lastEventId":"200","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}
`,
`id: test-operation
event: stream_chunk
data: {"type":"stream_chunk","timestamp":250,"operationId":"test-operation","data":{"content":"world"}}
`,
`id: test-operation
event: stream_end
data: {"type":"stream_end","timestamp":300,"operationId":"test-operation","data":{"messageId":"msg3"}}
\n`,
]);
// Verify API calls
expect(mockStreamEventManager.getStreamHistory).toHaveBeenCalledWith('test-operation', 50);
});
it('should handle errors with exact error event format', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation&includeHistory=true',
);
// Mock getStreamHistory to reject
mockStreamEventManager.getStreamHistory.mockRejectedValue(
new Error('Redis connection failed'),
);
const response = await GET(request);
const decoder = new TextDecoder();
const reader = response.body!.getReader();
// Collect all chunks
const chunks = [];
let readCount = 0;
const maxReads = 2; // connection + error event
try {
while (readCount < maxReads) {
const readPromise = reader.read();
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Read timeout')), 500),
);
const result = (await Promise.race([
readPromise,
timeoutPromise,
])) as ReadableStreamReadResult<Uint8Array>;
if (result.done) break;
if (result.value) {
const chunk =
result.value instanceof Uint8Array
? decoder.decode(result.value)
: String(result.value);
chunks.push(chunk);
readCount++;
}
}
} catch {
// Timeout or error
} finally {
reader.releaseLock();
}
// Verify exact stream format - connection event + error event (new SSE format)
// Parse error event to check format (error includes stack trace dynamically)
const errorChunk = chunks[1];
expect(errorChunk).toMatch(/^id: error_\d+\nevent: error\ndata: \{.*"type":"error".*\}\n\n$/);
expect(errorChunk).toContain('"error":"Redis connection failed"');
expect(errorChunk).toContain('"phase":"history_loading"');
expect(errorChunk).toContain('"operationId":"test-operation"');
expect(errorChunk).toContain(`"timestamp":${MOCK_TIMESTAMP}`);
// Verify connection event format
expect(chunks[0]).toEqual(
`id: conn_${MOCK_TIMESTAMP}\nevent: connected\ndata: {"lastEventId":"0","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}\n\n`,
);
// Verify getStreamHistory was called
expect(mockStreamEventManager.getStreamHistory).toHaveBeenCalledWith('test-operation', 50);
});
it('should verify stream subscription with exact parameters', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation&lastEventId=456',
);
mockStreamEventManager.subscribeStreamEvents.mockResolvedValue(undefined);
const response = await GET(request);
expect(response.status).toBe(200);
// Verify exact parameter passing
expect(mockStreamEventManager.subscribeStreamEvents).toHaveBeenCalledWith(
'test-operation',
'456',
expect.any(Function), // callback function
expect.any(AbortSignal), // abort signal
);
// Verify the callback function structure
const callArgs = mockStreamEventManager.subscribeStreamEvents.mock.calls[0];
expect(callArgs).toHaveLength(4);
expect(typeof callArgs[2]).toBe('function'); // callback
expect(callArgs[3]).toBeInstanceOf(AbortSignal); // signal
});
it('should verify default parameters with exact values', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
mockStreamEventManager.subscribeStreamEvents.mockResolvedValue(undefined);
const response = await GET(request);
expect(response.status).toBe(200);
// Verify default values are used
expect(mockStreamEventManager.subscribeStreamEvents).toHaveBeenCalledWith(
'test-operation',
'0', // default lastEventId
expect.any(Function),
expect.any(AbortSignal),
);
// Verify getStreamHistory is NOT called when includeHistory defaults to false
expect(mockStreamEventManager.getStreamHistory).not.toHaveBeenCalled();
});
it('should verify SSE message structure with exact format specification', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
const response = await GET(request);
const decoder = new TextDecoder();
const reader = response.body!.getReader();
// Collect all chunks
const chunks = [];
let readCount = 0;
const maxReads = 1; // Only read connection event
try {
while (readCount < maxReads) {
const readPromise = reader.read();
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Read timeout')), 1000),
);
const result = (await Promise.race([
readPromise,
timeoutPromise,
])) as ReadableStreamReadResult<Uint8Array>;
if (result.done) break;
if (result.value) {
const chunk =
result.value instanceof Uint8Array
? decoder.decode(result.value)
: String(result.value);
chunks.push(chunk);
readCount++;
}
}
} catch {
// Timeout or error
} finally {
reader.releaseLock();
}
// Verify exact stream format with default lastEventId (new SSE format)
expect(chunks).toEqual([
`id: conn_${MOCK_TIMESTAMP}\nevent: connected\ndata: {"lastEventId":"0","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}\n\n`,
]);
});
});
describe('Agent Runtime Lifecycle', () => {
it('should verify agent runtime event handling and connection closure logic', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
// Capture the event callback so we can test the event processing logic directly
let capturedCallback: ((events: any[]) => void) | null = null;
let capturedSignal: AbortSignal | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, signal) => {
capturedCallback = callback;
capturedSignal = signal;
return Promise.resolve();
},
);
const response = await GET(request);
// Verify the subscription was set up correctly
expect(mockStreamEventManager.subscribeStreamEvents).toHaveBeenCalledWith(
'test-operation',
'0',
expect.any(Function),
expect.any(AbortSignal),
);
expect(capturedCallback).toBeDefined();
expect(capturedSignal).toBeDefined();
// Verify response headers are correct
expect(response.status).toBe(200);
expect(response.headers.get('Content-Type')).toBe('text/event-stream');
// Test that the callback exists and can be called
expect(typeof capturedCallback).toBe('function');
expect(capturedSignal).toBeInstanceOf(AbortSignal);
});
it('should verify subscribeStreamEvents callback can handle agent_runtime_init events', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
let capturedCallback: ((events: any[]) => void) | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, _signal) => {
capturedCallback = callback;
return Promise.resolve();
},
);
const response = await GET(request);
// Verify we captured the callback
expect(capturedCallback).toBeDefined();
expect(response.status).toBe(200);
// Test agent_runtime_init event processing
const initEvent = {
type: 'agent_runtime_init',
timestamp: MOCK_TIMESTAMP + 100,
operationId: 'test-operation',
data: {
userId: 'user-123',
modelConfig: { model: 'gpt-4', temperature: 0.7 },
agentType: 'assistant',
},
};
// The callback should be callable without throwing errors
expect(() => capturedCallback!([initEvent])).not.toThrow();
});
it('should verify subscribeStreamEvents callback can handle agent_runtime_end events', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
let capturedCallback: ((events: any[]) => void) | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, _signal) => {
capturedCallback = callback;
return Promise.resolve();
},
);
const response = await GET(request);
// Verify we captured the callback
expect(capturedCallback).toBeDefined();
expect(response.status).toBe(200);
// Test agent_runtime_end event processing
const endEvent = {
type: 'agent_runtime_end',
timestamp: MOCK_TIMESTAMP + 600,
operationId: 'test-operation',
data: {
totalSteps: 1,
executionTime: 500,
status: 'completed',
},
};
// The callback should be callable without throwing errors
expect(() => capturedCallback!([endEvent])).not.toThrow();
});
it('should verify complete agent runtime lifecycle event types are supported', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
let capturedCallback: ((events: any[]) => void) | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, _signal) => {
capturedCallback = callback;
return Promise.resolve();
},
);
const response = await GET(request);
expect(capturedCallback).toBeDefined();
expect(response.status).toBe(200);
// Test complete lifecycle events can be processed
const lifecycleEvents = [
{
type: 'agent_runtime_init',
timestamp: MOCK_TIMESTAMP + 100,
operationId: 'test-operation',
data: { userId: 'user-123', agentType: 'assistant' },
},
{
type: 'stream_start',
timestamp: MOCK_TIMESTAMP + 200,
operationId: 'test-operation',
data: { messageId: 'msg-001' },
},
{
type: 'stream_chunk',
timestamp: MOCK_TIMESTAMP + 300,
operationId: 'test-operation',
data: { content: 'Hello world' },
},
{
type: 'stream_end',
timestamp: MOCK_TIMESTAMP + 400,
operationId: 'test-operation',
data: { messageId: 'msg-001' },
},
{
type: 'agent_runtime_end',
timestamp: MOCK_TIMESTAMP + 500,
operationId: 'test-operation',
data: { status: 'completed', totalSteps: 1 },
},
];
// All lifecycle events should be processable without throwing errors
expect(() => capturedCallback!(lifecycleEvents)).not.toThrow();
});
});
describe('Heartbeat and connection lifecycle', () => {
it('should close connection immediately after agent_runtime_end', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
let capturedCallback: ((events: any[]) => void) | null = null;
let capturedSignal: AbortSignal | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, signal) => {
capturedCallback = callback;
capturedSignal = signal;
return new Promise(() => {});
},
);
const response = await GET(request);
expect(capturedCallback).toBeDefined();
expect(capturedSignal).toBeDefined();
// Signal should not be aborted initially
expect(capturedSignal!.aborted).toBe(false);
// Simulate agent_runtime_end event
const endEvent = {
type: 'agent_runtime_end',
timestamp: MOCK_TIMESTAMP + 1000,
operationId: 'test-operation',
data: { status: 'completed' },
};
capturedCallback!([endEvent]);
// Signal should be aborted immediately after agent_runtime_end
expect(capturedSignal!.aborted).toBe(true);
});
it('should set streamEnded flag and close connection when agent_runtime_end is received', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
let capturedCallback: ((events: any[]) => void) | null = null;
let capturedSignal: AbortSignal | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, signal) => {
capturedCallback = callback;
capturedSignal = signal;
return new Promise(() => {});
},
);
const response = await GET(request);
expect(response.status).toBe(200);
expect(capturedCallback).toBeDefined();
expect(capturedSignal).toBeDefined();
// Simulate agent_runtime_end event - this should set streamEnded = true
const endEvent = {
type: 'agent_runtime_end',
timestamp: MOCK_TIMESTAMP + 1000,
operationId: 'test-operation',
data: { status: 'completed' },
};
// This should not throw - verifies the callback can handle the event
expect(() => capturedCallback!([endEvent])).not.toThrow();
// Signal should be aborted (connection closed)
expect(capturedSignal!.aborted).toBe(true);
});
it('should handle agent_runtime_end event in callback without errors', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
let capturedCallback: ((events: any[]) => void) | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, _signal) => {
capturedCallback = callback;
return new Promise(() => {});
},
);
const response = await GET(request);
expect(response.status).toBe(200);
expect(capturedCallback).toBeDefined();
// Simulate agent_runtime_end with full data
const endEvent = {
type: 'agent_runtime_end',
timestamp: MOCK_TIMESTAMP + 1000,
operationId: 'test-operation',
data: {
finalState: { status: 'completed' },
reason: 'completed',
reasonDetail: 'Agent runtime completed successfully',
},
};
// Verify the event is processed without throwing
expect(() => capturedCallback!([endEvent])).not.toThrow();
});
it('should handle batch events including agent_runtime_end without errors', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
let capturedCallback: ((events: any[]) => void) | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, _signal) => {
capturedCallback = callback;
return new Promise(() => {});
},
);
const response = await GET(request);
expect(response.status).toBe(200);
expect(capturedCallback).toBeDefined();
// Simulate batch of events ending with agent_runtime_end
const batchEvents = [
{
type: 'stream_chunk',
timestamp: MOCK_TIMESTAMP + 800,
operationId: 'test-operation',
data: { content: 'Final chunk' },
},
{
type: 'stream_end',
timestamp: MOCK_TIMESTAMP + 900,
operationId: 'test-operation',
data: { messageId: 'msg-001' },
},
{
type: 'agent_runtime_end',
timestamp: MOCK_TIMESTAMP + 1000,
operationId: 'test-operation',
data: { status: 'completed' },
},
];
// All events should be processed without throwing
expect(() => capturedCallback!(batchEvents)).not.toThrow();
});
it('should skip events after streamEnded flag is set', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test-operation',
);
let capturedCallback: ((events: any[]) => void) | null = null;
let capturedSignal: AbortSignal | null = null;
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
(operationId, lastEventId, callback, signal) => {
capturedCallback = callback;
capturedSignal = signal;
return new Promise(() => {});
},
);
const response = await GET(request);
expect(response.status).toBe(200);
expect(capturedCallback).toBeDefined();
expect(capturedSignal).toBeDefined();
expect(capturedSignal!.aborted).toBe(false);
// First, send agent_runtime_end to set streamEnded = true
capturedCallback!([
{
type: 'agent_runtime_end',
timestamp: MOCK_TIMESTAMP + 1000,
operationId: 'test-operation',
data: { status: 'completed' },
},
]);
// Signal should be aborted immediately
expect(capturedSignal!.aborted).toBe(true);
// Any subsequent events should be skipped (no errors)
expect(() =>
capturedCallback!([
{
type: 'step_complete',
timestamp: MOCK_TIMESTAMP + 1100,
operationId: 'test-operation',
data: { stepIndex: 1 },
},
]),
).not.toThrow();
});
});
describe('Parameter validation', () => {
it('should handle operationId with special characters', async () => {
const operationId = 'test-operation-123_456';
const request = new NextRequest(
`https://test.com/api/agent/stream?operationId=${operationId}`,
);
const response = await GET(request);
expect(response.status).toBe(200);
});
it('should handle lastEventId as string number', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test&lastEventId=12345',
);
const response = await GET(request);
expect(response.status).toBe(200);
});
it('should handle includeHistory as string boolean', async () => {
const request = new NextRequest(
'https://test.com/api/agent/stream?operationId=test&includeHistory=false',
);
const response = await GET(request);
expect(response.status).toBe(200);
expect(mockStreamEventManager.getStreamHistory).not.toHaveBeenCalled();
});
it('should handle invalid URL gracefully', async () => {
const request = new NextRequest('https://test.com/api/agent/stream?operationId=');
const response = await GET(request);
expect(response.status).toBe(400);
const data = await response.json();
expect(data.error).toBe('operationId parameter is required');
});
});
});
-213
View File
@@ -1,213 +0,0 @@
import { createSSEHeaders, createSSEWriter } from '@lobechat/utils/server';
import debug from 'debug';
import { type NextRequest } from 'next/server';
import { NextResponse } from 'next/server';
import { createStreamEventManager } from '@/server/modules/AgentRuntime';
const log = debug('api-route:agent:stream');
const timing = debug('lobe-server:agent-runtime:timing');
/**
* Server-Sent Events (SSE) endpoint
* Provides real-time Agent execution event stream for clients
*/
export async function GET(request: NextRequest) {
// Initialize stream event manager (uses InMemory singleton in local dev, Redis in production)
const streamManager = createStreamEventManager();
const { searchParams } = new URL(request.url);
const operationId = searchParams.get('operationId');
const lastEventId = searchParams.get('lastEventId') || '0';
const includeHistory = searchParams.get('includeHistory') === 'true';
if (!operationId) {
return NextResponse.json(
{
error: 'operationId parameter is required',
},
{ status: 400 },
);
}
log(`Starting SSE connection for operation ${operationId} from eventId ${lastEventId}`);
// Create Server-Sent Events stream
const stream = new ReadableStream({
cancel(reason) {
log(`SSE connection cancelled for operation ${operationId}:`, reason);
// Call cleanup function
if ((this as any)._cleanup) {
(this as any)._cleanup();
}
},
start(controller) {
const writer = createSSEWriter(controller);
// Send connection confirmation event
writer.writeConnection(operationId, lastEventId);
log(`SSE connection established for operation ${operationId}`);
// If needed, send historical events first
if (includeHistory) {
streamManager
.getStreamHistory(operationId, 50)
.then((history) => {
// Send historical events in chronological order (earliest first)
const sortedHistory = history.reverse();
sortedHistory.forEach((event) => {
// Only send events newer than lastEventId
if (!lastEventId || lastEventId === '0' || event.timestamp.toString() > lastEventId) {
try {
// Add SSE-specific fields, keeping format consistent with real-time events
const sseEvent = {
...event,
operationId,
timestamp: event.timestamp || Date.now(),
};
writer.writeStreamEvent(sseEvent, operationId);
} catch (error) {
console.error('[Agent Stream] Error sending history event:', error);
}
}
});
if (sortedHistory.length > 0) {
log(`Sent ${sortedHistory.length} historical events for operation ${operationId}`);
}
})
.catch((error) => {
console.error('[Agent Stream] Failed to load history:', error);
try {
writer.writeError(error, operationId, 'history_loading');
} catch (controllerError) {
console.error('[Agent Stream] Failed to send error event:', controllerError);
}
});
}
// Create AbortController for canceling subscription
const abortController = new AbortController();
// Track if stream has ended (agent_runtime_end received)
// Once set to true, no more events will be sent
let streamEnded = false;
// Send heartbeat periodically (every 30 seconds)
const heartbeatInterval = setInterval(() => {
// Skip heartbeat if stream has ended
if (streamEnded) {
return;
}
try {
const heartbeat = {
operationId,
timestamp: Date.now(),
type: 'heartbeat',
};
controller.enqueue(`data: ${JSON.stringify(heartbeat)}\n\n`);
} catch (error) {
console.error('[Agent Stream] Heartbeat error:', error);
clearInterval(heartbeatInterval);
}
}, 30_000);
// Cleanup function
const cleanup = () => {
abortController.abort();
clearInterval(heartbeatInterval);
log(`SSE connection closed for operation ${operationId}`);
};
// Subscribe to new streaming events
const subscribeToEvents = async () => {
try {
await streamManager.subscribeStreamEvents(
operationId,
lastEventId,
(events) => {
events.forEach((event) => {
// Skip all events if stream has ended
if (streamEnded) {
return;
}
try {
// Add SSE-specific fields
const sseEvent = {
...event,
operationId,
timestamp: event.timestamp || Date.now(),
};
const now = Date.now();
const totalLatency = now - sseEvent.timestamp;
writer.writeStreamEvent(sseEvent, operationId);
timing(
'[%s:%d] SSE sent %s, original timestamp %d, sent at %d, total latency %dms',
operationId,
event.stepIndex,
event.type,
sseEvent.timestamp,
now,
totalLatency,
);
// If agent_runtime_end event is received, terminate stream immediately
if (event.type === 'agent_runtime_end') {
log(
`Agent runtime ended for operation ${operationId}, terminating stream immediately`,
);
// Mark stream as ended to prevent any more events
streamEnded = true;
// Immediately cleanup and close connection
cleanup();
controller.close();
log(
`SSE connection closed after agent runtime end for operation ${operationId}`,
);
}
} catch (error) {
console.error('[Agent Stream] Error sending event:', error);
}
});
},
abortController.signal,
);
} catch (error) {
if (!abortController.signal.aborted) {
console.error('[Agent Stream] Subscription error:', error);
try {
writer.writeError(error as Error, operationId, 'stream_subscription');
} catch (controllerError) {
console.error('[Agent Stream] Failed to send subscription error:', controllerError);
}
}
}
};
// Start subscription
subscribeToEvents();
// Listen for connection close
request.signal?.addEventListener('abort', cleanup);
// Store cleanup function for calling during cancel
(controller as any)._cleanup = cleanup;
},
});
// Set SSE response headers
return new Response(stream, {
headers: createSSEHeaders(),
});
}
@@ -1,228 +0,0 @@
// @vitest-environment happy-dom
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { agentRuntimeClient } from '../client';
// Mock fetchEventSource
const mockFetchEventSource = vi.fn();
vi.mock('@lobechat/utils/client', () => ({
fetchEventSource: (url: string, options: any) => mockFetchEventSource(url, options),
}));
describe('AgentRuntimeClient', () => {
beforeEach(() => {
vi.clearAllMocks();
});
describe('createStreamConnection', () => {
it('should create connection with correct URL and parameters', () => {
const operationId = 'agent_1758302563222_0g28qmdmu';
agentRuntimeClient.createStreamConnection(operationId, {
includeHistory: false,
lastEventId: '0',
});
expect(mockFetchEventSource).toHaveBeenCalledWith(
'/api/agent/stream?includeHistory=false&lastEventId=0&operationId=agent_1758302563222_0g28qmdmu',
expect.objectContaining({
headers: {
'Cache-Control': 'no-cache',
'Last-Event-ID': '0',
},
signal: expect.any(AbortSignal),
}),
);
});
it('should handle complete agent runtime lifecycle with real stream data', async () => {
const operationId = 'agent_1758302563222_abc';
const events: any[] = [];
let connectCalled = false;
let disconnectCalled = false;
// Capture the callbacks passed to fetchEventSource
mockFetchEventSource.mockImplementation((url: string, options: any) => {
// Simulate connection opening
setTimeout(() => {
options.onopen?.({ ok: true, status: 200, statusText: 'OK' });
}, 10);
// Simulate receiving events
setTimeout(() => {
const streamEvents = [
`{"lastEventId":"0","operationId":"${operationId}","timestamp":1758302567925,"type":"connected"}`,
`{"type":"agent_runtime_init","stepIndex":0,"operationId":"${operationId}","data":{"agentConfig":{"enableSearch":true}},"timestamp":1758302564421}`,
`{"type":"stream_start","stepIndex":0,"operationId":"${operationId}","data":{"messageId":"msg1","model":"gpt-4"},"timestamp":1758302574552}`,
`{"type":"stream_chunk","stepIndex":0,"operationId":"${operationId}","data":{"content":"Hello"},"timestamp":1758302578042}`,
`{"type":"stream_end","stepIndex":0,"operationId":"${operationId}","data":{"finalContent":"Hello world"},"timestamp":1758302626595}`,
`{"type":"agent_runtime_end","stepIndex":0,"operationId":"${operationId}","data":{"status":"completed"},"timestamp":1758302631030}`,
];
streamEvents.forEach((eventData) => {
options.onmessage?.({ data: eventData, event: 'message' });
});
}, 20);
});
agentRuntimeClient.createStreamConnection(operationId, {
includeHistory: false,
onConnect: () => {
connectCalled = true;
},
onDisconnect: () => {
disconnectCalled = true;
},
onEvent: (event) => {
events.push(event);
},
});
// Wait for all events
await new Promise((resolve) => setTimeout(resolve, 50));
expect(connectCalled).toBe(true);
expect(events).toHaveLength(6);
// Verify event sequence
expect(events[0].type).toBe('connected');
expect(events[1].type).toBe('agent_runtime_init');
expect(events[2].type).toBe('stream_start');
expect(events[3].type).toBe('stream_chunk');
expect(events[4].type).toBe('stream_end');
expect(events[5].type).toBe('agent_runtime_end');
// Verify operationId consistency
events.forEach((event) => {
expect(event.operationId).toBe(operationId);
});
});
it('should handle heartbeat events correctly', async () => {
const operationId = 'test-operation';
const events: any[] = [];
mockFetchEventSource.mockImplementation((url: string, options: any) => {
setTimeout(() => {
const heartbeatData = `{"operationId":"${operationId}","timestamp":1758302597927,"type":"heartbeat"}`;
options.onmessage?.({ data: heartbeatData, event: 'message' });
}, 10);
});
agentRuntimeClient.createStreamConnection(operationId, {
onEvent: (event) => {
events.push(event);
},
});
await new Promise((resolve) => setTimeout(resolve, 30));
expect(events).toHaveLength(1);
expect(events[0].type).toBe('heartbeat');
expect(events[0].operationId).toBe(operationId);
});
it('should handle connection errors', async () => {
const operationId = 'test-operation';
let errorOccurred = false;
mockFetchEventSource.mockImplementation((url: string, options: any) => {
setTimeout(() => {
options.onerror?.(new Error('Connection failed'));
}, 10);
});
agentRuntimeClient.createStreamConnection(operationId, {
onError: (error) => {
errorOccurred = true;
expect(error.message).toBe('Connection failed');
},
});
await new Promise((resolve) => setTimeout(resolve, 30));
expect(errorOccurred).toBe(true);
});
it('should handle malformed JSON gracefully', async () => {
const operationId = 'test-operation';
const events: any[] = [];
let errorOccurred = false;
mockFetchEventSource.mockImplementation((url: string, options: any) => {
setTimeout(() => {
options.onmessage?.({ data: 'invalid json', event: 'message' });
}, 10);
});
agentRuntimeClient.createStreamConnection(operationId, {
onEvent: (event) => {
events.push(event);
},
onError: (error) => {
errorOccurred = true;
},
});
await new Promise((resolve) => setTimeout(resolve, 30));
expect(events).toHaveLength(0);
expect(errorOccurred).toBe(true);
});
it('should call onDisconnect when connection is closed', async () => {
const operationId = 'test-operation';
let disconnectCalled = false;
mockFetchEventSource.mockImplementation((url: string, options: any) => {
setTimeout(() => {
options.onclose?.();
}, 10);
});
agentRuntimeClient.createStreamConnection(operationId, {
onDisconnect: () => {
disconnectCalled = true;
},
});
await new Promise((resolve) => setTimeout(resolve, 30));
expect(disconnectCalled).toBe(true);
});
it('should include correct parameters in URL', () => {
const operationId = 'test-operation-123';
agentRuntimeClient.createStreamConnection(operationId, {
includeHistory: true,
lastEventId: '12345',
});
expect(mockFetchEventSource).toHaveBeenCalledWith(
'/api/agent/stream?includeHistory=true&lastEventId=12345&operationId=test-operation-123',
expect.any(Object),
);
});
it('should use default parameters when not provided', () => {
const operationId = 'test-operation';
agentRuntimeClient.createStreamConnection(operationId);
expect(mockFetchEventSource).toHaveBeenCalledWith(
'/api/agent/stream?includeHistory=false&lastEventId=0&operationId=test-operation',
expect.any(Object),
);
});
it('should return AbortController for cancellation', () => {
const operationId = 'test-operation';
const controller = agentRuntimeClient.createStreamConnection(operationId);
expect(controller).toBeInstanceOf(AbortController);
expect(controller.signal).toBeInstanceOf(AbortSignal);
});
});
});
-77
View File
@@ -1,77 +0,0 @@
import { fetchEventSource } from '@lobechat/utils/client';
import debug from 'debug';
import { type StreamConnectionOptions, type StreamEvent } from './type';
const log = debug('lobe-agent-runtime:client');
/**
* Agent Client Service for communicating with durable agents
*/
class AgentRuntimeClient {
private baseUrl = '/api/agent';
/**
* Create a streaming connection to receive real-time agent events
*/
createStreamConnection(
operationId: string,
options: StreamConnectionOptions = {},
): AbortController {
const {
includeHistory = false,
lastEventId = '0',
onEvent,
onError,
onConnect,
onDisconnect,
} = options;
const params = new URLSearchParams({
includeHistory: includeHistory.toString(),
lastEventId,
operationId,
});
const controller = new AbortController();
fetchEventSource(`${this.baseUrl}/stream?${params}`, {
headers: {
'Cache-Control': 'no-cache',
'Last-Event-ID': lastEventId,
},
onclose: () => {
log(`Stream connection closed for operation ${operationId}`);
onDisconnect?.();
},
onerror: (error) => {
console.error(`[AgentClientService] Stream error for operation ${operationId}:`, error);
onError?.(error instanceof Error ? error : new Error('Stream connection error'));
},
onmessage: (event) => {
try {
const data = JSON.parse(event.data) as StreamEvent;
log(`Received event: ${event.event || 'message'}`, event.data);
onEvent?.(data);
} catch (error) {
console.error('[AgentClientService] Failed to parse stream event:', error);
onError?.(new Error('Failed to parse stream event'));
}
},
onopen: async (response) => {
if (response.ok) {
log(`Stream connection opened for operation ${operationId}`);
onConnect?.();
} else {
throw new Error(`Failed to open stream: ${response.status} ${response.statusText}`);
}
},
signal: controller.signal,
});
return controller;
}
}
export const agentRuntimeClient = new AgentRuntimeClient();
-1
View File
@@ -7,7 +7,6 @@ import { contextEngineering } from '@/services/chat/mecha';
import { getAgentStoreState } from '@/store/agent';
import { agentChatConfigSelectors, agentSelectors } from '@/store/agent/selectors';
export { agentRuntimeClient } from './client';
export * from './type';
interface AgentOperationRequest {
@@ -1,8 +1,7 @@
import { act, renderHook } from '@testing-library/react';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { lambdaClient } from '@/libs/trpc/client';
import { agentRuntimeClient } from '@/services/agentRuntime';
import { useChatStore } from '@/store/chat/store';
// Keep zustand mock as it's needed globally
@@ -24,14 +23,35 @@ vi.mock('@/libs/trpc/client', () => ({
},
}));
// Mock agentRuntimeClient
vi.mock('@/services/agentRuntime', () => ({
agentRuntimeClient: {
createStreamConnection: vi.fn(),
},
StreamEvent: {},
// Mock AgentGatewayClient
vi.mock('@lobechat/agent-gateway-client', () => ({
AgentGatewayClient: vi.fn().mockImplementation(() => ({
connect: vi.fn(),
disconnect: vi.fn(),
on: vi.fn(),
off: vi.fn(),
sendInterrupt: vi.fn(),
sendToolConfirmation: vi.fn(),
sendUserInput: vi.fn(),
connectionStatus: 'disconnected',
})),
}));
// Mock global_serverConfigStore on window
const mockServerConfigStore = {
getState: vi.fn().mockReturnValue({
serverConfig: { agentGatewayUrl: 'https://test-gateway.example.com' },
}),
};
beforeAll(() => {
(window as any).global_serverConfigStore = mockServerConfigStore;
});
afterAll(() => {
delete (window as any).global_serverConfigStore;
});
// Test Constants
const TEST_IDS = {
AGENT_ID: 'test-agent-id',
@@ -134,6 +154,12 @@ describe('agentGroup actions', () => {
onOperationCancel: vi.fn(),
switchTopic: vi.fn(),
associateMessageWithOperation: vi.fn(),
isAgentGatewayAvailable: vi.fn().mockReturnValue(true),
internal_connectAgentGateway: vi.fn().mockReturnValue({
connect: vi.fn(),
disconnect: vi.fn(),
sendInterrupt: vi.fn(),
}),
});
});
});
@@ -218,7 +244,7 @@ describe('agentGroup actions', () => {
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
const context = createTestContext();
@@ -249,7 +275,7 @@ describe('agentGroup actions', () => {
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -291,7 +317,7 @@ describe('agentGroup actions', () => {
capturedState = useChatStore.getState().isCreatingMessage;
return createMockExecGroupAgentResponse();
});
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -313,7 +339,7 @@ describe('agentGroup actions', () => {
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
const context = createTestContext({ topicId: TEST_IDS.TOPIC_ID });
@@ -342,7 +368,7 @@ describe('agentGroup actions', () => {
const mockResponse = createMockExecGroupAgentResponse();
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(mockResponse);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
const context = createTestContext();
@@ -371,7 +397,7 @@ describe('agentGroup actions', () => {
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -394,13 +420,12 @@ describe('agentGroup actions', () => {
);
});
it('should register cancel handler for SSE stream', async () => {
it('should connect to gateway with correct params when gateway is available', async () => {
const { result } = renderHook(() => useChatStore());
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
await act(async () => {
await result.current.sendGroupMessage({
@@ -409,9 +434,13 @@ describe('agentGroup actions', () => {
});
});
expect(result.current.onOperationCancel).toHaveBeenCalledWith(
TEST_IDS.OPERATION_ID,
expect.any(Function),
expect(result.current.internal_connectAgentGateway).toHaveBeenCalledWith(
TEST_IDS.OPERATION_ID, // chatKey = operationId
expect.objectContaining({
assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID,
execOperationId: 'op-exec',
streamOperationId: TEST_IDS.OPERATION_ID,
}),
);
});
@@ -421,7 +450,7 @@ describe('agentGroup actions', () => {
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -446,13 +475,12 @@ describe('agentGroup actions', () => {
expect(result.current.associateMessageWithOperation).toHaveBeenCalledTimes(2);
});
it('should create stream connection with operationId', async () => {
it('should connect to agent gateway with operationId as chatKey', async () => {
const { result } = renderHook(() => useChatStore());
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
await act(async () => {
await result.current.sendGroupMessage({
@@ -461,50 +489,15 @@ describe('agentGroup actions', () => {
});
});
expect(agentRuntimeClient.createStreamConnection).toHaveBeenCalledWith(
TEST_IDS.OPERATION_ID,
// Should have called internal_connectAgentGateway (which creates gateway client)
expect(result.current.startOperation).toHaveBeenCalledWith(
expect.objectContaining({
includeHistory: false,
onConnect: expect.any(Function),
onDisconnect: expect.any(Function),
onError: expect.any(Function),
onEvent: expect.any(Function),
operationId: TEST_IDS.OPERATION_ID,
type: 'groupAgentStream',
}),
);
});
it('should complete operations on stream disconnect', async () => {
const { result } = renderHook(() => useChatStore());
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
let onDisconnectCallback: (() => void) | undefined;
vi.mocked(agentRuntimeClient.createStreamConnection).mockImplementation(
(_operationId, options) => {
onDisconnectCallback = options?.onDisconnect;
return {} as any;
},
);
await act(async () => {
await result.current.sendGroupMessage({
context: createTestContext(),
message: TEST_CONTENT.GROUP_MESSAGE,
});
});
// Simulate stream disconnect
await act(async () => {
onDisconnectCallback?.();
});
// Should complete both the stream operation and the main execServerAgentRuntime operation
expect(result.current.completeOperation).toHaveBeenCalledWith(TEST_IDS.OPERATION_ID);
expect(result.current.completeOperation).toHaveBeenCalledWith('op-exec');
});
it('should update topics when new topic is created', async () => {
const { result } = renderHook(() => useChatStore());
@@ -518,7 +511,7 @@ describe('agentGroup actions', () => {
topics: mockTopics,
}),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -546,7 +539,7 @@ describe('agentGroup actions', () => {
topics: { items: [], total: 1 },
}),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -569,7 +562,7 @@ describe('agentGroup actions', () => {
isCreateNewTopic: false,
}),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -710,7 +703,7 @@ describe('agentGroup actions', () => {
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -734,7 +727,7 @@ describe('agentGroup actions', () => {
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
createMockExecGroupAgentResponse(),
);
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
// Gateway client is auto-mocked
await act(async () => {
await result.current.sendGroupMessage({
@@ -5,8 +5,6 @@ import { nanoid } from '@lobechat/utils';
import debug from 'debug';
import { lambdaClient } from '@/libs/trpc/client';
import { type StreamEvent } from '@/services/agentRuntime';
import { agentRuntimeClient } from '@/services/agentRuntime';
import { type ChatStore } from '@/store/chat/store';
import { type StoreSetter } from '@/store/types';
import { setNamespace } from '@/utils/storeDebug';
@@ -39,8 +37,7 @@ export class ChatGroupChatActionImpl {
return;
}
const { internal_handleAgentStreamEvent, optimisticCreateTmpMessage, startOperation } =
this.#get();
const { optimisticCreateTmpMessage, startOperation } = this.#get();
log(
'sendGroupMessage: agentId=%s, groupId=%s, message=%s',
@@ -158,18 +155,10 @@ export class ChatGroupChatActionImpl {
return;
}
// 8. Create streaming context - use assistantMessageId from backend response
const streamContext = {
assistantId: result.assistantMessageId,
content: '',
reasoning: '',
tmpAssistantId: tempAssistantId, // Used for cleanup if needed
};
// 9. Start child operation for SSE stream using backend operationId
// 8. Start child operation for gateway stream using backend operationId
this.#get().startOperation({
context: { ...execContext, messageId: result.assistantMessageId },
label: 'Group Agent Stream',
label: 'Group Agent Gateway',
operationId: result.operationId,
parentOperationId: execOperationId,
type: 'groupAgentStream',
@@ -181,40 +170,20 @@ export class ChatGroupChatActionImpl {
this.#get().associateMessageWithOperation(result.assistantMessageId, execOperationId);
this.#get().associateMessageWithOperation(result.assistantMessageId, result.operationId);
// 10. Connect to SSE stream
// Server will automatically close the connection after sending agent_runtime_end event
const eventSource = agentRuntimeClient.createStreamConnection(result.operationId, {
includeHistory: false,
onConnect: () => {
log('Stream connected to %s', result.operationId);
},
onDisconnect: () => {
log('Stream disconnected from %s', result.operationId);
// Complete both operations when stream disconnects (either by server close or client abort)
this.#get().completeOperation(result.operationId);
this.#get().completeOperation(execOperationId);
},
onError: (error: Error) => {
log('Stream error for %s: %O', result.operationId, error);
// Fail the stream operation on error
this.#get().failOperation(result.operationId, {
message: error.message,
type: 'AgentStreamError',
});
if (streamContext.assistantId) {
this.#get().internal_handleAgentError(streamContext.assistantId, error.message);
}
},
onEvent: async (event: StreamEvent) => {
await internal_handleAgentStreamEvent(result.operationId, event, streamContext);
},
});
// 11. Register cancel handler for aborting SSE stream
this.#get().onOperationCancel(result.operationId, () => {
log('Cancelling SSE stream for operation %s', result.operationId);
eventSource.abort();
});
// 9. Connect to Agent Gateway via WebSocket (if available)
if (this.#get().isAgentGatewayAvailable()) {
const chatKey = result.operationId; // Use operationId as chatKey for gateway routing
this.#get().internal_connectAgentGateway(chatKey, {
assistantId: result.assistantMessageId,
execOperationId,
streamOperationId: result.operationId,
});
} else {
log('Agent gateway not available, server-side execution will use fallback path');
// Complete operations — the backend handles execution independently
this.#get().completeOperation(result.operationId);
this.#get().completeOperation(execOperationId);
}
} catch (error) {
// Check if this is an abort error (user cancelled the operation)
const isAbortError =
@@ -1,3 +1,12 @@
import {
type AgentEventMessage,
AgentGatewayClient,
type IAgentGatewayClient,
type InputRequestMessage,
type SessionCompleteMessage,
type StatusChangeMessage,
type ToolConfirmationRequestMessage,
} from '@lobechat/agent-gateway-client';
import { isDesktop } from '@lobechat/const';
import { type ChatToolPayload } from '@lobechat/types';
import debug from 'debug';
@@ -25,6 +34,15 @@ type Setter = StoreSetter<ChatStore>;
export const agentSlice = (set: Setter, get: () => ChatStore, _api?: unknown) =>
new AgentActionImpl(set, get, _api);
/**
* Read the agent gateway URL from server config.
* Returns undefined if not configured (feature disabled).
*/
const getAgentGatewayUrl = (): string | undefined => {
if (typeof window === 'undefined' || !window.global_serverConfigStore) return undefined;
return window.global_serverConfigStore.getState()?.serverConfig?.agentGatewayUrl;
};
export class AgentActionImpl {
readonly #get: () => ChatStore;
@@ -334,6 +352,199 @@ export class AgentActionImpl {
}
};
// ─── Agent Gateway (WebSocket) ───
/**
* Check if the Agent Gateway is available (configured via server config).
*/
isAgentGatewayAvailable = (): boolean => {
return !!getAgentGatewayUrl();
};
/**
* Connect to Agent Gateway via WebSocket and wire up event handlers.
* Returns the gateway client instance (stored on the operation for cancel handling).
*/
internal_connectAgentGateway = (
chatKey: string,
params: {
assistantId: string;
execOperationId: string;
streamOperationId: string;
},
): IAgentGatewayClient | undefined => {
const { assistantId, execOperationId, streamOperationId } = params;
const gatewayUrl = getAgentGatewayUrl();
if (!gatewayUrl) {
log('Agent gateway URL not configured, skipping WebSocket connection');
this.#get().failOperation(streamOperationId, {
message: 'Agent gateway URL not configured',
type: 'AgentGatewayError',
});
return undefined;
}
// Token is empty for now — the gateway authenticates via cookie/session
// forwarded through the WebSocket handshake. When a dedicated gateway JWT
// endpoint is added, replace this with the fetched token.
const token = '';
const client = new AgentGatewayClient({ gatewayUrl, token });
const streamContext = { assistantId, content: '', reasoning: '' };
client.on('connected', () => {
log('Gateway connected for %s', chatKey);
});
client.on('disconnected', () => {
log('Gateway disconnected for %s', chatKey);
this.#get().completeOperation(streamOperationId);
this.#get().completeOperation(execOperationId);
});
client.on('error', (error: Error | Event) => {
log('Gateway error for %s: %O', chatKey, error);
this.#get().failOperation(streamOperationId, {
message: error instanceof Error ? error.message : 'Gateway connection error',
type: 'AgentGatewayError',
});
this.#get().internal_handleAgentError(assistantId, 'Gateway connection error');
});
client.on('agent_event', (message: AgentEventMessage) => {
this.internal_handleGatewayAgentEvent(assistantId, message, streamContext);
});
client.on('tool_confirmation_request', (message: ToolConfirmationRequestMessage) => {
log('Tool confirmation request: %s', message.toolCallId);
this.#get().updateOperationMetadata(streamOperationId, {
gatewayClient: client,
needsHumanInput: true,
pendingApproval: [{ tool: message.tool, toolCallId: message.toolCallId }],
});
this.#get().internal_toggleMessageLoading(false, assistantId);
});
client.on('input_request', (message: InputRequestMessage) => {
log('User input request: %s', message.requestId);
this.#get().updateOperationMetadata(streamOperationId, {
gatewayClient: client,
needsHumanInput: true,
pendingPrompt: message.prompt,
pendingRequestId: message.requestId,
});
this.#get().internal_toggleMessageLoading(false, assistantId);
});
client.on('session_complete', (_message: SessionCompleteMessage) => {
log('Session complete for %s', chatKey);
this.#get().internal_toggleMessageLoading(false, assistantId);
this.#get().completeOperation(streamOperationId);
this.#get().completeOperation(execOperationId);
// Mark unread completion for background conversations
const op = this.#get().operations[streamOperationId];
if (op?.context.agentId) {
this.#get().markUnreadCompleted(op.context.agentId, op.context.topicId);
}
// Disconnect after completion
client.disconnect();
});
client.on('status_update', (message: StatusChangeMessage) => {
log('Status change for %s: %s', chatKey, message.status);
if (message.status === 'error' || message.status === 'interrupted') {
this.#get().internal_toggleMessageLoading(false, assistantId);
}
});
// Register cancel handler
this.#get().onOperationCancel(streamOperationId, () => {
log('Cancelling gateway connection for %s', chatKey);
client.sendInterrupt();
client.disconnect();
});
// Connect
client.connect(chatKey);
return client;
};
/**
* Handle agent_event messages from the gateway.
* Maps gateway AgentStreamEvent kinds to store updates.
*/
private internal_handleGatewayAgentEvent = (
assistantId: string,
message: AgentEventMessage,
context: { assistantId: string; content: string; reasoning: string },
): void => {
const { internal_dispatchMessage } = this.#get();
const event = message.event;
switch (event.kind) {
case 'text_delta': {
context.content += event.content;
internal_dispatchMessage({
id: assistantId,
type: 'updateMessage',
value: { content: context.content },
});
break;
}
case 'thinking': {
context.reasoning += event.content;
internal_dispatchMessage({
id: assistantId,
type: 'updateMessage',
value: { reasoning: { content: context.reasoning } },
});
break;
}
case 'tool_call_start': {
// Tool call started - could update tools display
log('Tool call start: %s (%s)', event.name, event.toolCallId);
break;
}
case 'tool_call_delta': {
// Tool call arguments streaming
break;
}
case 'tool_call_end': {
log('Tool call end: %s', event.toolCallId);
break;
}
case 'tool_result': {
log('Tool result for %s', event.toolCallId);
// Refresh messages to display tool results from server
this.#get().refreshMessages();
break;
}
case 'step_complete': {
log('Step %d complete', event.stepIndex);
break;
}
case 'message_complete': {
log('Message complete: %s', event.messageId);
this.#get().internal_toggleMessageLoading(false, assistantId);
// Refresh to get final persisted state from server
this.#get().refreshMessages();
break;
}
}
};
internal_handleHumanIntervention = async (
assistantId: string,
action: string,
@@ -355,12 +566,30 @@ export class AgentActionImpl {
try {
log(`Handling human intervention ${action} for operation ${messageOpId}:`, data);
// Send human intervention request
await agentRuntimeService.handleHumanIntervention({
action: action as any,
data,
operationId: messageOpId,
});
// Check if this operation has a gateway client (WebSocket path)
const gatewayClient = operation.metadata.gatewayClient as IAgentGatewayClient | undefined;
if (gatewayClient) {
// Send via WebSocket
if (action === 'approve' || action === 'reject') {
const toolCallId =
data?.toolCallId || operation.metadata.pendingApproval?.[0]?.toolCallId;
if (toolCallId) {
gatewayClient.sendToolConfirmation(toolCallId, action === 'approve');
}
} else if (action === 'user_input') {
const requestId = data?.requestId || operation.metadata.pendingRequestId;
if (requestId && data?.content) {
gatewayClient.sendUserInput(requestId, data.content);
}
}
} else {
// Fallback to HTTP for non-gateway operations
await agentRuntimeService.handleHumanIntervention({
action: action as any,
data,
operationId: messageOpId,
});
}
// Resume loading state
this.#get().internal_toggleMessageLoading(true, assistantId);