mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-21 06:29:59 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5e7b1b7822 | |||
| d27cb4e087 | |||
| c2dfda2f06 | |||
| 58a91a229f |
@@ -198,6 +198,7 @@
|
||||
"@icons-pack/react-simple-icons": "^13.8.0",
|
||||
"@khmyznikov/pwa-install": "0.3.9",
|
||||
"@lexical/utils": "^0.42.0",
|
||||
"@lobechat/agent-gateway-client": "workspace:*",
|
||||
"@lobechat/agent-runtime": "workspace:*",
|
||||
"@lobechat/agent-templates": "workspace:*",
|
||||
"@lobechat/builtin-agent-onboarding": "workspace:*",
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"name": "@lobechat/agent-gateway-client",
|
||||
"version": "1.0.0",
|
||||
"private": true,
|
||||
"exports": {
|
||||
".": "./src/index.ts"
|
||||
},
|
||||
"main": "./src/index.ts",
|
||||
"scripts": {
|
||||
"test": "bunx vitest run --silent='passed-only'",
|
||||
"test:coverage": "bunx vitest run --coverage"
|
||||
},
|
||||
"devDependencies": {
|
||||
"vitest": "^3.0.0"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,654 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { AgentGatewayClient } from '../client';
|
||||
import type { ServerMessage } from '../types';
|
||||
|
||||
// ─── WebSocket Mock ───
|
||||
|
||||
class MockWebSocket {
|
||||
static CONNECTING = 0;
|
||||
static OPEN = 1;
|
||||
static CLOSING = 2;
|
||||
static CLOSED = 3;
|
||||
|
||||
readyState = MockWebSocket.CONNECTING;
|
||||
url: string;
|
||||
|
||||
private eventListeners = new Map<string, Set<(...args: any[]) => void>>();
|
||||
|
||||
constructor(url: string) {
|
||||
this.url = url;
|
||||
// Simulate async open via microtask (not affected by fake timers)
|
||||
Promise.resolve().then(() => {
|
||||
if (this.readyState === MockWebSocket.CONNECTING) {
|
||||
this.readyState = MockWebSocket.OPEN;
|
||||
this.dispatchEvent('open');
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
addEventListener(event: string, listener: (...args: any[]) => void): void {
|
||||
let set = this.eventListeners.get(event);
|
||||
if (!set) {
|
||||
set = new Set();
|
||||
this.eventListeners.set(event, set);
|
||||
}
|
||||
set.add(listener);
|
||||
}
|
||||
|
||||
removeEventListener(event: string, listener: (...args: any[]) => void): void {
|
||||
this.eventListeners.get(event)?.delete(listener);
|
||||
}
|
||||
|
||||
send = vi.fn();
|
||||
|
||||
close = vi.fn((code?: number, reason?: string) => {
|
||||
this.readyState = MockWebSocket.CLOSED;
|
||||
this.dispatchEvent('close', { code, reason });
|
||||
});
|
||||
|
||||
// Test helpers
|
||||
dispatchEvent(event: string, data?: any): void {
|
||||
const set = this.eventListeners.get(event);
|
||||
if (set) {
|
||||
for (const listener of set) {
|
||||
listener(data || {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
simulateMessage(message: ServerMessage): void {
|
||||
this.dispatchEvent('message', { data: JSON.stringify(message) });
|
||||
}
|
||||
}
|
||||
|
||||
let mockWsInstances: MockWebSocket[] = [];
|
||||
|
||||
beforeEach(() => {
|
||||
mockWsInstances = [];
|
||||
vi.stubGlobal(
|
||||
'WebSocket',
|
||||
Object.assign(
|
||||
class extends MockWebSocket {
|
||||
constructor(url: string) {
|
||||
super(url);
|
||||
mockWsInstances.push(this);
|
||||
}
|
||||
},
|
||||
{
|
||||
CONNECTING: MockWebSocket.CONNECTING,
|
||||
OPEN: MockWebSocket.OPEN,
|
||||
CLOSING: MockWebSocket.CLOSING,
|
||||
CLOSED: MockWebSocket.CLOSED,
|
||||
},
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
const createClient = (overrides?: Partial<ConstructorParameters<typeof AgentGatewayClient>[0]>) =>
|
||||
new AgentGatewayClient({
|
||||
gatewayUrl: 'https://gateway.example.com',
|
||||
token: 'test-token',
|
||||
...overrides,
|
||||
});
|
||||
|
||||
const getLastWs = () => mockWsInstances.at(-1)!;
|
||||
|
||||
describe('AgentGatewayClient', () => {
|
||||
describe('constructor', () => {
|
||||
it('should initialize with disconnected status', () => {
|
||||
const client = createClient();
|
||||
expect(client.connectionStatus).toBe('disconnected');
|
||||
});
|
||||
});
|
||||
|
||||
describe('connect', () => {
|
||||
it('should create WebSocket with correct URL', () => {
|
||||
const client = createClient();
|
||||
client.connect('test-chat-key');
|
||||
|
||||
const ws = getLastWs();
|
||||
expect(ws.url).toBe('wss://gateway.example.com/ws?chatKey=test-chat-key');
|
||||
});
|
||||
|
||||
it('should use ws:// for http:// gateway URLs', () => {
|
||||
const client = createClient({ gatewayUrl: 'http://localhost:8080' });
|
||||
client.connect('key');
|
||||
|
||||
const ws = getLastWs();
|
||||
expect(ws.url).toBe('ws://localhost:8080/ws?chatKey=key');
|
||||
});
|
||||
|
||||
it('should set status to connecting', () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
expect(client.connectionStatus).toBe('connecting');
|
||||
});
|
||||
|
||||
it('should not create multiple connections', () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
client.connect('key'); // second call should be no-op
|
||||
|
||||
expect(mockWsInstances).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('should send auth message on open', async () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
|
||||
// Wait for async open
|
||||
await vi.waitFor(() => {
|
||||
expect(getLastWs().send).toHaveBeenCalledWith(
|
||||
JSON.stringify({ token: 'test-token', type: 'auth' }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set status to authenticating on open', async () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(client.connectionStatus).toBe('authenticating');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('authentication', () => {
|
||||
it('should emit connected and set status on auth_success', async () => {
|
||||
const client = createClient();
|
||||
const connectedHandler = vi.fn();
|
||||
client.on('connected', connectedHandler);
|
||||
client.connect('key');
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(client.connectionStatus).toBe('authenticating');
|
||||
});
|
||||
|
||||
getLastWs().simulateMessage({ type: 'auth_success' });
|
||||
|
||||
expect(client.connectionStatus).toBe('connected');
|
||||
expect(connectedHandler).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('should emit auth_failed and disconnect on auth_failed', async () => {
|
||||
const client = createClient();
|
||||
const authFailedHandler = vi.fn();
|
||||
client.on('auth_failed', authFailedHandler);
|
||||
client.connect('key');
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(client.connectionStatus).toBe('authenticating');
|
||||
});
|
||||
|
||||
getLastWs().simulateMessage({ type: 'auth_failed', reason: 'Invalid token' });
|
||||
|
||||
expect(authFailedHandler).toHaveBeenCalledWith('Invalid token');
|
||||
expect(client.connectionStatus).toBe('disconnected');
|
||||
});
|
||||
});
|
||||
|
||||
describe('disconnect', () => {
|
||||
it('should close WebSocket and set status to disconnected', async () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(client.connectionStatus).toBe('authenticating');
|
||||
});
|
||||
|
||||
client.disconnect();
|
||||
|
||||
expect(client.connectionStatus).toBe('disconnected');
|
||||
expect(getLastWs().close).toHaveBeenCalledWith(1000, 'Client disconnect');
|
||||
});
|
||||
|
||||
it('should preserve event listeners after disconnect', async () => {
|
||||
const client = createClient();
|
||||
const handler = vi.fn();
|
||||
client.on('connected', handler);
|
||||
|
||||
client.connect('key');
|
||||
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
|
||||
|
||||
client.disconnect();
|
||||
|
||||
// Re-connect and verify handler still works
|
||||
client.connect('key2');
|
||||
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
|
||||
|
||||
getLastWs().simulateMessage({ type: 'auth_success' });
|
||||
expect(handler).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
describe('dispose', () => {
|
||||
it('should disconnect and remove all listeners', async () => {
|
||||
const client = createClient();
|
||||
const handler = vi.fn();
|
||||
client.on('connected', handler);
|
||||
|
||||
client.connect('key');
|
||||
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
|
||||
|
||||
client.dispose();
|
||||
expect(client.connectionStatus).toBe('disconnected');
|
||||
});
|
||||
});
|
||||
|
||||
describe('event handling', () => {
|
||||
const setupConnectedClient = async () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
|
||||
getLastWs().simulateMessage({ type: 'auth_success' });
|
||||
expect(client.connectionStatus).toBe('connected');
|
||||
return client;
|
||||
};
|
||||
|
||||
it('should emit agent_event', async () => {
|
||||
const client = await setupConnectedClient();
|
||||
const handler = vi.fn();
|
||||
client.on('agent_event', handler);
|
||||
|
||||
const msg = {
|
||||
type: 'agent_event' as const,
|
||||
id: 'evt-1',
|
||||
event: { kind: 'text_delta' as const, content: 'hello' },
|
||||
};
|
||||
getLastWs().simulateMessage(msg);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(msg);
|
||||
});
|
||||
|
||||
it('should emit tool_confirmation_request', async () => {
|
||||
const client = await setupConnectedClient();
|
||||
const handler = vi.fn();
|
||||
client.on('tool_confirmation_request', handler);
|
||||
|
||||
const msg = {
|
||||
type: 'tool_confirmation_request' as const,
|
||||
id: 'evt-2',
|
||||
toolCallId: 'tc-1',
|
||||
tool: { apiName: 'test', arguments: '{}', identifier: 'test', name: 'test' },
|
||||
};
|
||||
getLastWs().simulateMessage(msg);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(msg);
|
||||
});
|
||||
|
||||
it('should emit input_request', async () => {
|
||||
const client = await setupConnectedClient();
|
||||
const handler = vi.fn();
|
||||
client.on('input_request', handler);
|
||||
|
||||
const msg = {
|
||||
type: 'input_request' as const,
|
||||
id: 'evt-3',
|
||||
requestId: 'req-1',
|
||||
prompt: 'Enter value',
|
||||
};
|
||||
getLastWs().simulateMessage(msg);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(msg);
|
||||
});
|
||||
|
||||
it('should emit session_complete', async () => {
|
||||
const client = await setupConnectedClient();
|
||||
const handler = vi.fn();
|
||||
client.on('session_complete', handler);
|
||||
|
||||
const msg = {
|
||||
type: 'session_complete' as const,
|
||||
id: 'evt-4',
|
||||
summary: 'Done',
|
||||
};
|
||||
getLastWs().simulateMessage(msg);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(msg);
|
||||
});
|
||||
|
||||
it('should emit error for error messages', async () => {
|
||||
const client = await setupConnectedClient();
|
||||
const handler = vi.fn();
|
||||
client.on('error', handler);
|
||||
|
||||
getLastWs().simulateMessage({
|
||||
type: 'error',
|
||||
id: 'evt-5',
|
||||
code: 'ERR',
|
||||
message: 'Something failed',
|
||||
});
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(expect.any(Error));
|
||||
expect((handler.mock.calls[0][0] as Error).message).toBe('Something failed');
|
||||
});
|
||||
|
||||
it('should warn on malformed messages instead of silently ignoring', async () => {
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
|
||||
const client = await setupConnectedClient();
|
||||
|
||||
getLastWs().dispatchEvent('message', { data: 'not json' });
|
||||
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
'[AgentGatewayClient] Failed to parse message:',
|
||||
expect.any(Error),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('client commands', () => {
|
||||
it('should send interrupt message', async () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
await vi.waitFor(() => expect(client.connectionStatus).toBe('authenticating'));
|
||||
|
||||
// Make ws OPEN
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.send.mockClear();
|
||||
|
||||
client.sendInterrupt();
|
||||
expect(ws.send).toHaveBeenCalledWith(JSON.stringify({ type: 'interrupt' }));
|
||||
});
|
||||
|
||||
it('should send tool confirmation', async () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.send.mockClear();
|
||||
|
||||
client.sendToolConfirmation('tc-1', true);
|
||||
expect(ws.send).toHaveBeenCalledWith(
|
||||
JSON.stringify({ approved: true, toolCallId: 'tc-1', type: 'tool_confirmation' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should send user input', async () => {
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.send.mockClear();
|
||||
|
||||
client.sendUserInput('req-1', 'user response');
|
||||
expect(ws.send).toHaveBeenCalledWith(
|
||||
JSON.stringify({ content: 'user response', requestId: 'req-1', type: 'user_input' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should not throw when sending on closed socket', () => {
|
||||
// Don't connect — socket is null
|
||||
expect(() => createClient().sendInterrupt()).not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe('heartbeat', () => {
|
||||
it('should send heartbeat after auth_success', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
ws.simulateMessage({ type: 'auth_success' });
|
||||
|
||||
ws.send.mockClear();
|
||||
|
||||
// Advance past heartbeat interval
|
||||
vi.advanceTimersByTime(30_000);
|
||||
|
||||
expect(ws.send).toHaveBeenCalledWith(JSON.stringify({ type: 'heartbeat' }));
|
||||
});
|
||||
|
||||
it('should close connection on heartbeat ack timeout', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
ws.simulateMessage({ type: 'auth_success' });
|
||||
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
|
||||
|
||||
// Send heartbeat
|
||||
vi.advanceTimersByTime(30_000);
|
||||
// Don't send ack — wait for timeout
|
||||
vi.advanceTimersByTime(10_000);
|
||||
|
||||
expect(ws.close).toHaveBeenCalledWith(4000, 'Heartbeat timeout');
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should clear ack timer on heartbeat_ack', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
ws.simulateMessage({ type: 'auth_success' });
|
||||
|
||||
// Send heartbeat
|
||||
vi.advanceTimersByTime(30_000);
|
||||
// Ack received
|
||||
ws.simulateMessage({ type: 'heartbeat_ack' });
|
||||
// Advance past timeout — should NOT close
|
||||
vi.advanceTimersByTime(10_000);
|
||||
|
||||
// close should not have been called (only from our test, not from timeout)
|
||||
expect(ws.close).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('reconnection', () => {
|
||||
it('should reconnect on unexpected close', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient();
|
||||
const reconnectHandler = vi.fn();
|
||||
client.on('reconnecting', reconnectHandler);
|
||||
|
||||
client.connect('key');
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
ws.simulateMessage({ type: 'auth_success' });
|
||||
|
||||
// Simulate unexpected close
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
|
||||
expect(client.connectionStatus).toBe('reconnecting');
|
||||
expect(reconnectHandler).toHaveBeenCalledWith(1000); // initial delay
|
||||
|
||||
// Advance to trigger reconnect
|
||||
vi.advanceTimersByTime(1000);
|
||||
expect(mockWsInstances).toHaveLength(2);
|
||||
});
|
||||
|
||||
it('should use exponential backoff', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient({ maxReconnectAttempts: 0 }); // unlimited
|
||||
const delays: number[] = [];
|
||||
client.on('reconnecting', (delay: number) => delays.push(delay));
|
||||
|
||||
client.connect('key');
|
||||
|
||||
// First WS opens, auth succeeds, then closes unexpectedly
|
||||
let ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
ws.simulateMessage({ type: 'auth_success' });
|
||||
|
||||
// Close triggers reconnect with delay[0]
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
vi.advanceTimersByTime(delays.at(-1)!);
|
||||
|
||||
// Second WS - close again
|
||||
ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
vi.advanceTimersByTime(delays.at(-1)!);
|
||||
|
||||
// Third WS - close again
|
||||
ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
vi.advanceTimersByTime(delays.at(-1)!);
|
||||
|
||||
// Fourth WS - close again
|
||||
ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
|
||||
expect(delays).toEqual([1000, 2000, 4000, 8000]);
|
||||
});
|
||||
|
||||
it('should stop after max reconnect attempts', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient({ maxReconnectAttempts: 2 });
|
||||
const disconnectedHandler = vi.fn();
|
||||
const errorHandler = vi.fn();
|
||||
client.on('disconnected', disconnectedHandler);
|
||||
client.on('error', errorHandler);
|
||||
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
|
||||
|
||||
client.connect('key');
|
||||
|
||||
// First WS opens and then closes
|
||||
let ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
ws.simulateMessage({ type: 'auth_success' });
|
||||
|
||||
// Close -> reconnect attempt 1
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
vi.advanceTimersByTime(1000);
|
||||
|
||||
// Second WS close -> reconnect attempt 2
|
||||
ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
vi.advanceTimersByTime(2000);
|
||||
|
||||
// Third WS close -> attempt 3, exceeds max (2)
|
||||
ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
|
||||
// Should stop reconnecting
|
||||
expect(client.connectionStatus).toBe('disconnected');
|
||||
expect(disconnectedHandler).toHaveBeenCalled();
|
||||
expect(errorHandler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ message: 'Max reconnect attempts reached' }),
|
||||
);
|
||||
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should not reconnect on intentional disconnect', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient();
|
||||
|
||||
client.connect('key');
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
|
||||
client.disconnect();
|
||||
|
||||
// Should not create new connections
|
||||
vi.advanceTimersByTime(60_000);
|
||||
expect(mockWsInstances).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('should not reconnect when autoReconnect is false', async () => {
|
||||
const client = createClient({ autoReconnect: false });
|
||||
const disconnectedHandler = vi.fn();
|
||||
client.on('disconnected', disconnectedHandler);
|
||||
|
||||
client.connect('key');
|
||||
const ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
|
||||
expect(client.connectionStatus).toBe('disconnected');
|
||||
expect(disconnectedHandler).toHaveBeenCalled();
|
||||
expect(mockWsInstances).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('resume', () => {
|
||||
it('should send resume with lastEventId on reconnect', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient();
|
||||
client.connect('key');
|
||||
|
||||
let ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
ws.simulateMessage({ type: 'auth_success' });
|
||||
|
||||
// Receive an event to set lastEventId
|
||||
ws.simulateMessage({
|
||||
type: 'agent_event',
|
||||
id: 'evt-99',
|
||||
event: { kind: 'text_delta', content: 'hi' },
|
||||
});
|
||||
|
||||
// Simulate disconnect and reconnect
|
||||
ws.readyState = MockWebSocket.CLOSED;
|
||||
ws.dispatchEvent('close');
|
||||
vi.advanceTimersByTime(1000);
|
||||
|
||||
ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
ws.simulateMessage({ type: 'auth_success' });
|
||||
|
||||
expect(ws.send).toHaveBeenCalledWith(
|
||||
JSON.stringify({ lastEventId: 'evt-99', type: 'resume' }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('updateToken', () => {
|
||||
it('should use updated token on next auth', async () => {
|
||||
vi.useFakeTimers();
|
||||
const client = createClient({ token: 'old-token' });
|
||||
client.connect('key');
|
||||
|
||||
let ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
|
||||
// First auth uses old token
|
||||
expect(ws.send).toHaveBeenCalledWith(JSON.stringify({ token: 'old-token', type: 'auth' }));
|
||||
|
||||
// Update token and reconnect
|
||||
client.updateToken('new-token');
|
||||
ws.simulateMessage({ type: 'auth_failed', reason: 'expired' });
|
||||
|
||||
// Reconnect with new token
|
||||
client.connect('key');
|
||||
ws = getLastWs();
|
||||
ws.readyState = MockWebSocket.OPEN;
|
||||
ws.dispatchEvent('open');
|
||||
|
||||
expect(ws.send).toHaveBeenCalledWith(JSON.stringify({ token: 'new-token', type: 'auth' }));
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,407 @@
|
||||
import type {
|
||||
AgentEventMessage,
|
||||
AgentGatewayClientEvents,
|
||||
ClientMessage,
|
||||
ConnectionStatus,
|
||||
ErrorMessage,
|
||||
IAgentGatewayClient,
|
||||
InputRequestMessage,
|
||||
ServerMessage,
|
||||
SessionCompleteMessage,
|
||||
StatusChangeMessage,
|
||||
ToolConfirmationRequestMessage,
|
||||
} from './types';
|
||||
|
||||
// ─── Constants ───
|
||||
|
||||
const HEARTBEAT_INTERVAL = 30_000; // 30s
|
||||
const HEARTBEAT_TIMEOUT = 10_000; // 10s — if no ack within this window, consider dead
|
||||
const INITIAL_RECONNECT_DELAY = 1000; // 1s
|
||||
const MAX_RECONNECT_DELAY = 30_000; // 30s
|
||||
const DEFAULT_MAX_RECONNECT_ATTEMPTS = 10;
|
||||
|
||||
// ─── Minimal EventEmitter for Browser ───
|
||||
|
||||
type Listener = (...args: any[]) => void;
|
||||
|
||||
class BrowserEventEmitter {
|
||||
private listeners = new Map<string, Set<Listener>>();
|
||||
|
||||
on(event: string, listener: Listener): void {
|
||||
let set = this.listeners.get(event);
|
||||
if (!set) {
|
||||
set = new Set();
|
||||
this.listeners.set(event, set);
|
||||
}
|
||||
set.add(listener);
|
||||
}
|
||||
|
||||
off(event: string, listener: Listener): void {
|
||||
this.listeners.get(event)?.delete(listener);
|
||||
}
|
||||
|
||||
emit(event: string, ...args: any[]): void {
|
||||
const set = this.listeners.get(event);
|
||||
if (set) {
|
||||
for (const listener of set) {
|
||||
listener(...args);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
removeAllListeners(): void {
|
||||
this.listeners.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Client Options ───
|
||||
|
||||
export interface AgentGatewayClientOptions {
|
||||
/** Auto-reconnect on disconnection (default: true) */
|
||||
autoReconnect?: boolean;
|
||||
/** Gateway base URL (e.g. https://agent-gateway.lobehub.com) */
|
||||
gatewayUrl: string;
|
||||
/** Max reconnect attempts before giving up (default: 10, 0 = unlimited) */
|
||||
maxReconnectAttempts?: number;
|
||||
/** JWT token for authentication */
|
||||
token: string;
|
||||
}
|
||||
|
||||
// ─── Agent Gateway Client ───
|
||||
|
||||
export class AgentGatewayClient implements IAgentGatewayClient {
|
||||
private ws: WebSocket | null = null;
|
||||
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
|
||||
private heartbeatAckTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private reconnectDelay = INITIAL_RECONNECT_DELAY;
|
||||
private reconnectAttempts = 0;
|
||||
private status: ConnectionStatus = 'disconnected';
|
||||
private intentionalDisconnect = false;
|
||||
|
||||
private chatKey = '';
|
||||
private lastEventId: string | undefined;
|
||||
|
||||
private gatewayUrl: string;
|
||||
private token: string;
|
||||
private autoReconnect: boolean;
|
||||
private maxReconnectAttempts: number;
|
||||
|
||||
private emitter = new BrowserEventEmitter();
|
||||
|
||||
constructor(options: AgentGatewayClientOptions) {
|
||||
this.gatewayUrl = options.gatewayUrl;
|
||||
this.token = options.token;
|
||||
this.autoReconnect = options.autoReconnect ?? true;
|
||||
this.maxReconnectAttempts = options.maxReconnectAttempts ?? DEFAULT_MAX_RECONNECT_ATTEMPTS;
|
||||
}
|
||||
|
||||
// ─── Public API ───
|
||||
|
||||
get connectionStatus(): ConnectionStatus {
|
||||
return this.status;
|
||||
}
|
||||
|
||||
on<K extends keyof AgentGatewayClientEvents>(
|
||||
event: K,
|
||||
listener: AgentGatewayClientEvents[K],
|
||||
): void {
|
||||
this.emitter.on(event, listener as Listener);
|
||||
}
|
||||
|
||||
off<K extends keyof AgentGatewayClientEvents>(
|
||||
event: K,
|
||||
listener: AgentGatewayClientEvents[K],
|
||||
): void {
|
||||
this.emitter.off(event, listener as Listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to a specific agent session.
|
||||
*/
|
||||
connect(chatKey: string): void {
|
||||
if (this.status === 'connected' || this.status === 'connecting') return;
|
||||
|
||||
this.chatKey = chatKey;
|
||||
this.intentionalDisconnect = false;
|
||||
this.reconnectAttempts = 0;
|
||||
this.doConnect();
|
||||
}
|
||||
|
||||
disconnect(): void {
|
||||
this.intentionalDisconnect = true;
|
||||
this.cleanupConnection();
|
||||
this.setStatus('disconnected');
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the auth token (e.g. after refresh).
|
||||
*/
|
||||
updateToken(token: string): void {
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
// ─── Client → Server Commands ───
|
||||
|
||||
sendInterrupt(): void {
|
||||
this.send({ type: 'interrupt' });
|
||||
}
|
||||
|
||||
sendToolConfirmation(toolCallId: string, approved: boolean): void {
|
||||
this.send({ approved, toolCallId, type: 'tool_confirmation' });
|
||||
}
|
||||
|
||||
sendUserInput(requestId: string, content: string): void {
|
||||
this.send({ content, requestId, type: 'user_input' });
|
||||
}
|
||||
|
||||
// ─── Connection Logic ───
|
||||
|
||||
private doConnect(): void {
|
||||
this.clearReconnectTimer();
|
||||
this.setStatus('connecting');
|
||||
|
||||
try {
|
||||
const wsUrl = this.buildWsUrl();
|
||||
const ws = new WebSocket(wsUrl);
|
||||
|
||||
ws.addEventListener('open', this.handleOpen);
|
||||
ws.addEventListener('message', this.handleMessage);
|
||||
ws.addEventListener('close', this.handleClose);
|
||||
ws.addEventListener('error', this.handleError);
|
||||
|
||||
this.ws = ws;
|
||||
} catch {
|
||||
this.setStatus('disconnected');
|
||||
if (this.autoReconnect) {
|
||||
this.scheduleReconnect();
|
||||
} else {
|
||||
this.emitter.emit('disconnected');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private buildWsUrl(): string {
|
||||
const wsProtocol = this.gatewayUrl.startsWith('https') ? 'wss' : 'ws';
|
||||
const host = this.gatewayUrl.replace(/^https?:\/\//, '');
|
||||
const params = new URLSearchParams({ chatKey: this.chatKey });
|
||||
return `${wsProtocol}://${host}/ws?${params.toString()}`;
|
||||
}
|
||||
|
||||
// ─── WebSocket Event Handlers ───
|
||||
|
||||
private handleOpen = (): void => {
|
||||
this.reconnectDelay = INITIAL_RECONNECT_DELAY;
|
||||
this.reconnectAttempts = 0;
|
||||
this.setStatus('authenticating');
|
||||
|
||||
// Authenticate
|
||||
this.send({ token: this.token, type: 'auth' });
|
||||
};
|
||||
|
||||
private handleMessage = (ev: MessageEvent): void => {
|
||||
try {
|
||||
const message = JSON.parse(ev.data as string) as ServerMessage;
|
||||
|
||||
switch (message.type) {
|
||||
case 'auth_success': {
|
||||
this.setStatus('connected');
|
||||
this.startHeartbeat();
|
||||
this.emitter.emit('connected');
|
||||
|
||||
// Resume from last event if reconnecting
|
||||
if (this.lastEventId) {
|
||||
this.send({ lastEventId: this.lastEventId, type: 'resume' });
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'auth_failed': {
|
||||
this.emitter.emit('auth_failed', message.reason);
|
||||
this.disconnect();
|
||||
break;
|
||||
}
|
||||
|
||||
case 'heartbeat_ack': {
|
||||
this.clearHeartbeatAckTimer();
|
||||
break;
|
||||
}
|
||||
|
||||
case 'agent_event': {
|
||||
this.lastEventId = message.id;
|
||||
this.emitter.emit('agent_event', message as AgentEventMessage);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'status_change': {
|
||||
this.lastEventId = message.id;
|
||||
this.emitter.emit('status_update', message as StatusChangeMessage);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_confirmation_request': {
|
||||
this.lastEventId = message.id;
|
||||
this.emitter.emit('tool_confirmation_request', message as ToolConfirmationRequestMessage);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'input_request': {
|
||||
this.lastEventId = message.id;
|
||||
this.emitter.emit('input_request', message as InputRequestMessage);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'session_complete': {
|
||||
this.lastEventId = message.id;
|
||||
this.emitter.emit('session_complete', message as SessionCompleteMessage);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'error': {
|
||||
this.lastEventId = (message as ErrorMessage).id;
|
||||
this.emitter.emit('error', new Error((message as ErrorMessage).message));
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn('[AgentGatewayClient] Failed to parse message:', error);
|
||||
}
|
||||
};
|
||||
|
||||
private handleClose = (): void => {
|
||||
this.stopHeartbeat();
|
||||
this.ws = null;
|
||||
|
||||
if (!this.intentionalDisconnect && this.autoReconnect) {
|
||||
this.setStatus('reconnecting');
|
||||
this.scheduleReconnect();
|
||||
} else {
|
||||
this.setStatus('disconnected');
|
||||
this.emitter.emit('disconnected');
|
||||
}
|
||||
};
|
||||
|
||||
private handleError = (ev: Event): void => {
|
||||
this.emitter.emit('error', ev);
|
||||
};
|
||||
|
||||
// ─── Heartbeat ───
|
||||
|
||||
private startHeartbeat(): void {
|
||||
this.stopHeartbeat();
|
||||
this.heartbeatTimer = setInterval(() => {
|
||||
this.send({ type: 'heartbeat' });
|
||||
this.startHeartbeatAckTimer();
|
||||
}, HEARTBEAT_INTERVAL);
|
||||
}
|
||||
|
||||
private stopHeartbeat(): void {
|
||||
if (this.heartbeatTimer) {
|
||||
clearInterval(this.heartbeatTimer);
|
||||
this.heartbeatTimer = null;
|
||||
}
|
||||
this.clearHeartbeatAckTimer();
|
||||
}
|
||||
|
||||
private startHeartbeatAckTimer(): void {
|
||||
this.clearHeartbeatAckTimer();
|
||||
this.heartbeatAckTimer = setTimeout(() => {
|
||||
// No ack received — connection is likely dead
|
||||
console.warn('[AgentGatewayClient] Heartbeat ack timeout, closing connection');
|
||||
this.ws?.close(4000, 'Heartbeat timeout');
|
||||
}, HEARTBEAT_TIMEOUT);
|
||||
}
|
||||
|
||||
private clearHeartbeatAckTimer(): void {
|
||||
if (this.heartbeatAckTimer) {
|
||||
clearTimeout(this.heartbeatAckTimer);
|
||||
this.heartbeatAckTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Reconnection (exponential backoff) ───
|
||||
|
||||
private scheduleReconnect(): void {
|
||||
// Check max attempts (0 = unlimited)
|
||||
if (this.maxReconnectAttempts > 0 && this.reconnectAttempts >= this.maxReconnectAttempts) {
|
||||
console.warn(
|
||||
`[AgentGatewayClient] Max reconnect attempts (${this.maxReconnectAttempts}) reached`,
|
||||
);
|
||||
this.setStatus('disconnected');
|
||||
this.emitter.emit('disconnected');
|
||||
this.emitter.emit('error', new Error('Max reconnect attempts reached'));
|
||||
return;
|
||||
}
|
||||
|
||||
this.clearReconnectTimer();
|
||||
this.reconnectAttempts++;
|
||||
|
||||
const delay = this.reconnectDelay;
|
||||
this.emitter.emit('reconnecting', delay);
|
||||
|
||||
this.reconnectTimer = setTimeout(() => {
|
||||
this.reconnectTimer = null;
|
||||
this.doConnect();
|
||||
}, delay);
|
||||
|
||||
// Exponential backoff: 1s → 2s → 4s → 8s → ... → 30s
|
||||
this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY);
|
||||
}
|
||||
|
||||
private clearReconnectTimer(): void {
|
||||
if (this.reconnectTimer) {
|
||||
clearTimeout(this.reconnectTimer);
|
||||
this.reconnectTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Status ───
|
||||
|
||||
private setStatus(status: ConnectionStatus): void {
|
||||
if (this.status === status) return;
|
||||
this.status = status;
|
||||
this.emitter.emit('status_changed', status);
|
||||
}
|
||||
|
||||
// ─── Helpers ───
|
||||
|
||||
/**
|
||||
* Send a message to the gateway. Returns false if the socket is not open.
|
||||
*/
|
||||
private send(data: ClientMessage): boolean {
|
||||
if (this.ws?.readyState === WebSocket.OPEN) {
|
||||
this.ws.send(JSON.stringify(data));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up WebSocket connection and timers, but preserve emitter listeners.
|
||||
* Emitter listeners are registered by the consumer and should survive reconnects.
|
||||
*/
|
||||
private cleanupConnection(): void {
|
||||
this.stopHeartbeat();
|
||||
this.clearReconnectTimer();
|
||||
|
||||
if (this.ws) {
|
||||
this.ws.removeEventListener('open', this.handleOpen);
|
||||
this.ws.removeEventListener('message', this.handleMessage);
|
||||
this.ws.removeEventListener('close', this.handleClose);
|
||||
this.ws.removeEventListener('error', this.handleError);
|
||||
|
||||
if (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING) {
|
||||
this.ws.close(1000, 'Client disconnect');
|
||||
}
|
||||
this.ws = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Full cleanup including emitter listeners. Call only when the client is being disposed.
|
||||
*/
|
||||
dispose(): void {
|
||||
this.disconnect();
|
||||
this.emitter.removeAllListeners();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
export type { AgentGatewayClientOptions } from './client';
|
||||
export { AgentGatewayClient } from './client';
|
||||
export type { IAgentGatewayClient } from './types';
|
||||
export * from './types';
|
||||
@@ -0,0 +1,176 @@
|
||||
// ─── Session State ───
|
||||
|
||||
export type SessionStatus =
|
||||
| 'completed'
|
||||
| 'error'
|
||||
| 'interrupted'
|
||||
| 'running'
|
||||
| 'waiting_confirmation'
|
||||
| 'waiting_input';
|
||||
|
||||
// ─── Agent Stream Events (pushed from backend via Gateway) ───
|
||||
|
||||
export type AgentStreamEvent =
|
||||
| { content: string; kind: 'text_delta' }
|
||||
| { content: string; kind: 'thinking' }
|
||||
| { kind: 'message_complete'; messageId: string }
|
||||
| { kind: 'step_complete'; stepIndex: number }
|
||||
| { arguments: string; kind: 'tool_call_delta'; toolCallId: string }
|
||||
| { kind: 'tool_call_end'; toolCallId: string }
|
||||
| { arguments: string; kind: 'tool_call_start'; name: string; toolCallId: string }
|
||||
| { kind: 'tool_result'; result: string; toolCallId: string };
|
||||
|
||||
export interface ToolCallInfo {
|
||||
apiName: string;
|
||||
arguments: string;
|
||||
identifier: string;
|
||||
name: string;
|
||||
}
|
||||
|
||||
// ─── Client → Server Messages (Browser → Gateway) ───
|
||||
|
||||
export interface AuthMessage {
|
||||
token: string;
|
||||
type: 'auth';
|
||||
}
|
||||
|
||||
export interface HeartbeatMessage {
|
||||
type: 'heartbeat';
|
||||
}
|
||||
|
||||
export interface InterruptMessage {
|
||||
type: 'interrupt';
|
||||
}
|
||||
|
||||
export interface ToolConfirmationMessage {
|
||||
approved: boolean;
|
||||
toolCallId: string;
|
||||
type: 'tool_confirmation';
|
||||
}
|
||||
|
||||
export interface UserInputMessage {
|
||||
content: string;
|
||||
requestId: string;
|
||||
type: 'user_input';
|
||||
}
|
||||
|
||||
export interface ResumeMessage {
|
||||
lastEventId: string;
|
||||
type: 'resume';
|
||||
}
|
||||
|
||||
export type ClientMessage =
|
||||
| AuthMessage
|
||||
| HeartbeatMessage
|
||||
| InterruptMessage
|
||||
| ResumeMessage
|
||||
| ToolConfirmationMessage
|
||||
| UserInputMessage;
|
||||
|
||||
// ─── Server → Client Messages (Gateway → Browser) ───
|
||||
|
||||
export interface AuthSuccessMessage {
|
||||
type: 'auth_success';
|
||||
}
|
||||
|
||||
export interface AuthFailedMessage {
|
||||
reason: string;
|
||||
type: 'auth_failed';
|
||||
}
|
||||
|
||||
export interface HeartbeatAckMessage {
|
||||
type: 'heartbeat_ack';
|
||||
}
|
||||
|
||||
export interface AgentEventMessage {
|
||||
event: AgentStreamEvent;
|
||||
id: string;
|
||||
type: 'agent_event';
|
||||
}
|
||||
|
||||
export interface StatusChangeMessage {
|
||||
id: string;
|
||||
status: SessionStatus;
|
||||
type: 'status_change';
|
||||
}
|
||||
|
||||
export interface ToolConfirmationRequestMessage {
|
||||
id: string;
|
||||
tool: ToolCallInfo;
|
||||
toolCallId: string;
|
||||
type: 'tool_confirmation_request';
|
||||
}
|
||||
|
||||
export interface InputRequestMessage {
|
||||
id: string;
|
||||
prompt: string;
|
||||
requestId: string;
|
||||
type: 'input_request';
|
||||
}
|
||||
|
||||
export interface SessionCompleteMessage {
|
||||
id: string;
|
||||
summary?: string;
|
||||
type: 'session_complete';
|
||||
}
|
||||
|
||||
export interface ErrorMessage {
|
||||
code: string;
|
||||
id: string;
|
||||
message: string;
|
||||
type: 'error';
|
||||
}
|
||||
|
||||
export type ServerMessage =
|
||||
| AgentEventMessage
|
||||
| AuthFailedMessage
|
||||
| AuthSuccessMessage
|
||||
| ErrorMessage
|
||||
| HeartbeatAckMessage
|
||||
| InputRequestMessage
|
||||
| SessionCompleteMessage
|
||||
| StatusChangeMessage
|
||||
| ToolConfirmationRequestMessage;
|
||||
|
||||
// ─── Client Types ───
|
||||
|
||||
export type ConnectionStatus =
|
||||
| 'authenticating'
|
||||
| 'connected'
|
||||
| 'connecting'
|
||||
| 'disconnected'
|
||||
| 'reconnecting';
|
||||
|
||||
export interface AgentGatewayClientEvents {
|
||||
agent_event: (message: AgentEventMessage) => void;
|
||||
auth_failed: (reason: string) => void;
|
||||
connected: () => void;
|
||||
disconnected: () => void;
|
||||
error: (error: Event | Error) => void;
|
||||
input_request: (message: InputRequestMessage) => void;
|
||||
reconnecting: (delay: number) => void;
|
||||
session_complete: (message: SessionCompleteMessage) => void;
|
||||
status_changed: (status: ConnectionStatus) => void;
|
||||
status_update: (message: StatusChangeMessage) => void;
|
||||
tool_confirmation_request: (message: ToolConfirmationRequestMessage) => void;
|
||||
}
|
||||
|
||||
// ─── Client Interface ───
|
||||
|
||||
export interface IAgentGatewayClient {
|
||||
connect: (chatKey: string) => void;
|
||||
readonly connectionStatus: ConnectionStatus;
|
||||
disconnect: () => void;
|
||||
off: <K extends keyof AgentGatewayClientEvents>(
|
||||
event: K,
|
||||
listener: AgentGatewayClientEvents[K],
|
||||
) => void;
|
||||
on: <K extends keyof AgentGatewayClientEvents>(
|
||||
event: K,
|
||||
listener: AgentGatewayClientEvents[K],
|
||||
) => void;
|
||||
sendInterrupt: () => void;
|
||||
sendToolConfirmation: (toolCallId: string, approved: boolean) => void;
|
||||
sendUserInput: (requestId: string, content: string) => void;
|
||||
updateToken: (token: string) => void;
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"extends": "../../tsconfig.json",
|
||||
"include": ["src/"]
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
import { defineConfig } from 'vitest/config';
|
||||
|
||||
export default defineConfig({
|
||||
test: {
|
||||
coverage: {
|
||||
all: false,
|
||||
reporter: ['text', 'json', 'lcov', 'text-summary'],
|
||||
},
|
||||
environment: 'node',
|
||||
},
|
||||
});
|
||||
@@ -47,6 +47,7 @@ export interface ServerModelProviderConfig {
|
||||
export type ServerLanguageModel = Partial<Record<GlobalLLMProviderKey, ServerModelProviderConfig>>;
|
||||
|
||||
export interface GlobalServerConfig {
|
||||
agentGatewayUrl?: string;
|
||||
aiProvider: ServerLanguageModel;
|
||||
defaultAgent?: PartialDeep<UserDefaultAgent>;
|
||||
disableEmailPassword?: boolean;
|
||||
|
||||
@@ -1,857 +0,0 @@
|
||||
// @vitest-environment node
|
||||
import { NextRequest } from 'next/server';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { GET } from '../route';
|
||||
|
||||
// Mock dependencies first
|
||||
const mockStreamEventManager = {
|
||||
getStreamHistory: vi.fn(),
|
||||
subscribeStreamEvents: vi.fn(),
|
||||
};
|
||||
|
||||
vi.mock('@/server/modules/AgentRuntime', () => ({
|
||||
createStreamEventManager: vi.fn(() => mockStreamEventManager),
|
||||
}));
|
||||
|
||||
describe('/api/agent/stream route', () => {
|
||||
const MOCK_TIMESTAMP = 1758203237000;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
// Mock Date.now to return consistent timestamp
|
||||
vi.spyOn(Date, 'now').mockReturnValue(MOCK_TIMESTAMP);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('GET handler', () => {
|
||||
it('should return 400 when operationId parameter is missing', async () => {
|
||||
const request = new NextRequest('https://test.com/api/agent/stream');
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(400);
|
||||
const data = await response.json();
|
||||
expect(data.error).toBe('operationId parameter is required');
|
||||
});
|
||||
|
||||
it('should return SSE stream with correct headers when operationId is provided', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.headers.get('Content-Type')).toBe('text/event-stream');
|
||||
expect(response.headers.get('Cache-Control')).toBe('no-cache, no-transform');
|
||||
expect(response.headers.get('Connection')).toBe('keep-alive');
|
||||
expect(response.headers.get('Access-Control-Allow-Origin')).toBe('*');
|
||||
expect(response.headers.get('Access-Control-Allow-Methods')).toBe('GET');
|
||||
expect(response.headers.get('Access-Control-Allow-Headers')).toBe(
|
||||
'Cache-Control, Last-Event-ID',
|
||||
);
|
||||
expect(response.headers.get('X-Accel-Buffering')).toBe('no');
|
||||
});
|
||||
});
|
||||
|
||||
describe('Stream functionality with exact data verification', () => {
|
||||
it('should send connection event in exact SSE format', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation&lastEventId=123',
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
const decoder = new TextDecoder();
|
||||
const reader = response.body!.getReader();
|
||||
|
||||
// Collect all chunks
|
||||
const chunks = [];
|
||||
let readCount = 0;
|
||||
const maxReads = 1; // Only read connection event
|
||||
|
||||
try {
|
||||
while (readCount < maxReads) {
|
||||
const readPromise = reader.read();
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Read timeout')), 1000),
|
||||
);
|
||||
|
||||
const result = (await Promise.race([
|
||||
readPromise,
|
||||
timeoutPromise,
|
||||
])) as ReadableStreamReadResult<Uint8Array>;
|
||||
|
||||
if (result.done) break;
|
||||
if (result.value) {
|
||||
const chunk =
|
||||
result.value instanceof Uint8Array
|
||||
? decoder.decode(result.value)
|
||||
: String(result.value);
|
||||
chunks.push(chunk);
|
||||
readCount++;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Timeout or error
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
// Verify exact stream format with mocked timestamp (new SSE format)
|
||||
expect(chunks).toEqual([
|
||||
`id: conn_${MOCK_TIMESTAMP}\nevent: connected\ndata: {"lastEventId":"123","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}\n\n`,
|
||||
]);
|
||||
});
|
||||
|
||||
it('should verify getStreamHistory with exact historical events format', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation&includeHistory=true&lastEventId=100',
|
||||
);
|
||||
|
||||
// Mock getStreamHistory to return specific events
|
||||
const mockEvents = [
|
||||
{
|
||||
type: 'stream_end',
|
||||
timestamp: 300,
|
||||
operationId: 'test-operation',
|
||||
data: { messageId: 'msg3' },
|
||||
},
|
||||
{
|
||||
type: 'stream_chunk',
|
||||
timestamp: 250,
|
||||
operationId: 'test-operation',
|
||||
data: { content: 'world' },
|
||||
},
|
||||
{
|
||||
type: 'stream_start',
|
||||
timestamp: 150,
|
||||
operationId: 'test-operation',
|
||||
data: { messageId: 'msg1' },
|
||||
},
|
||||
];
|
||||
mockStreamEventManager.getStreamHistory.mockResolvedValue(mockEvents);
|
||||
|
||||
const response = await GET(request);
|
||||
const decoder = new TextDecoder();
|
||||
const reader = response.body!.getReader();
|
||||
|
||||
// Collect all chunks
|
||||
const chunks = [];
|
||||
let readCount = 0;
|
||||
const maxReads = 3; // connection + 2 filtered historical events (timestamp > 100)
|
||||
|
||||
try {
|
||||
while (readCount < maxReads) {
|
||||
const readPromise = reader.read();
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Read timeout')), 500),
|
||||
);
|
||||
|
||||
const result = (await Promise.race([
|
||||
readPromise,
|
||||
timeoutPromise,
|
||||
])) as ReadableStreamReadResult<Uint8Array>;
|
||||
|
||||
if (result.done) break;
|
||||
if (result.value) {
|
||||
const chunk =
|
||||
result.value instanceof Uint8Array
|
||||
? decoder.decode(result.value)
|
||||
: String(result.value);
|
||||
chunks.push(chunk);
|
||||
readCount++;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Timeout or error
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
// Verify exact stream format - connection event + filtered historical events (new SSE format)
|
||||
expect(chunks).toEqual([
|
||||
`id: conn_${MOCK_TIMESTAMP}\nevent: connected\ndata: {"lastEventId":"100","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}\n\n`,
|
||||
`id: test-operation\nevent: stream_start\ndata: {"type":"stream_start","timestamp":150,"operationId":"test-operation","data":{"messageId":"msg1"}}\n\n`,
|
||||
`id: test-operation\nevent: stream_chunk\ndata: {"type":"stream_chunk","timestamp":250,"operationId":"test-operation","data":{"content":"world"}}\n\n`,
|
||||
]);
|
||||
|
||||
// Verify API calls
|
||||
expect(mockStreamEventManager.getStreamHistory).toHaveBeenCalledWith('test-operation', 50);
|
||||
});
|
||||
|
||||
it('should verify event filtering with exact format', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation&includeHistory=true&lastEventId=200',
|
||||
);
|
||||
|
||||
// Mock events where some should be filtered out
|
||||
const mockEvents = [
|
||||
{
|
||||
type: 'stream_end',
|
||||
timestamp: 300,
|
||||
operationId: 'test-operation',
|
||||
data: { messageId: 'msg3' },
|
||||
}, // Should be included (300 > 200)
|
||||
{
|
||||
type: 'stream_chunk',
|
||||
timestamp: 250,
|
||||
operationId: 'test-operation',
|
||||
data: { content: 'world' },
|
||||
}, // Should be included (250 > 200)
|
||||
{
|
||||
type: 'stream_chunk',
|
||||
timestamp: 200,
|
||||
operationId: 'test-operation',
|
||||
data: { content: 'hello' },
|
||||
}, // Should be excluded (200 = 200)
|
||||
{
|
||||
type: 'stream_start',
|
||||
timestamp: 150,
|
||||
operationId: 'test-operation',
|
||||
data: { messageId: 'msg1' },
|
||||
}, // Should be excluded (150 < 200)
|
||||
];
|
||||
mockStreamEventManager.getStreamHistory.mockResolvedValue(mockEvents);
|
||||
|
||||
const response = await GET(request);
|
||||
const decoder = new TextDecoder();
|
||||
const reader = response.body!.getReader();
|
||||
|
||||
// Collect all chunks
|
||||
const chunks = [];
|
||||
let readCount = 0;
|
||||
const maxReads = 3; // connection + 2 filtered events
|
||||
|
||||
try {
|
||||
while (readCount < maxReads) {
|
||||
const readPromise = reader.read();
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Read timeout')), 500),
|
||||
);
|
||||
|
||||
const result = (await Promise.race([
|
||||
readPromise,
|
||||
timeoutPromise,
|
||||
])) as ReadableStreamReadResult<Uint8Array>;
|
||||
|
||||
if (result.done) break;
|
||||
if (result.value) {
|
||||
const chunk =
|
||||
result.value instanceof Uint8Array
|
||||
? decoder.decode(result.value)
|
||||
: String(result.value);
|
||||
chunks.push(chunk);
|
||||
readCount++;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Timeout or error
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
// Verify exact stream format - only events with timestamp > 200 are included (new SSE format)
|
||||
// Note: indices are based on original array position, not filtered position
|
||||
expect(chunks).toEqual([
|
||||
`id: conn_${MOCK_TIMESTAMP}
|
||||
event: connected
|
||||
data: {"lastEventId":"200","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}
|
||||
|
||||
`,
|
||||
`id: test-operation
|
||||
event: stream_chunk
|
||||
data: {"type":"stream_chunk","timestamp":250,"operationId":"test-operation","data":{"content":"world"}}
|
||||
|
||||
`,
|
||||
`id: test-operation
|
||||
event: stream_end
|
||||
data: {"type":"stream_end","timestamp":300,"operationId":"test-operation","data":{"messageId":"msg3"}}
|
||||
\n`,
|
||||
]);
|
||||
|
||||
// Verify API calls
|
||||
expect(mockStreamEventManager.getStreamHistory).toHaveBeenCalledWith('test-operation', 50);
|
||||
});
|
||||
|
||||
it('should handle errors with exact error event format', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation&includeHistory=true',
|
||||
);
|
||||
|
||||
// Mock getStreamHistory to reject
|
||||
mockStreamEventManager.getStreamHistory.mockRejectedValue(
|
||||
new Error('Redis connection failed'),
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
const decoder = new TextDecoder();
|
||||
const reader = response.body!.getReader();
|
||||
|
||||
// Collect all chunks
|
||||
const chunks = [];
|
||||
let readCount = 0;
|
||||
const maxReads = 2; // connection + error event
|
||||
|
||||
try {
|
||||
while (readCount < maxReads) {
|
||||
const readPromise = reader.read();
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Read timeout')), 500),
|
||||
);
|
||||
|
||||
const result = (await Promise.race([
|
||||
readPromise,
|
||||
timeoutPromise,
|
||||
])) as ReadableStreamReadResult<Uint8Array>;
|
||||
|
||||
if (result.done) break;
|
||||
if (result.value) {
|
||||
const chunk =
|
||||
result.value instanceof Uint8Array
|
||||
? decoder.decode(result.value)
|
||||
: String(result.value);
|
||||
chunks.push(chunk);
|
||||
readCount++;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Timeout or error
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
// Verify exact stream format - connection event + error event (new SSE format)
|
||||
// Parse error event to check format (error includes stack trace dynamically)
|
||||
const errorChunk = chunks[1];
|
||||
expect(errorChunk).toMatch(/^id: error_\d+\nevent: error\ndata: \{.*"type":"error".*\}\n\n$/);
|
||||
expect(errorChunk).toContain('"error":"Redis connection failed"');
|
||||
expect(errorChunk).toContain('"phase":"history_loading"');
|
||||
expect(errorChunk).toContain('"operationId":"test-operation"');
|
||||
expect(errorChunk).toContain(`"timestamp":${MOCK_TIMESTAMP}`);
|
||||
|
||||
// Verify connection event format
|
||||
expect(chunks[0]).toEqual(
|
||||
`id: conn_${MOCK_TIMESTAMP}\nevent: connected\ndata: {"lastEventId":"0","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}\n\n`,
|
||||
);
|
||||
|
||||
// Verify getStreamHistory was called
|
||||
expect(mockStreamEventManager.getStreamHistory).toHaveBeenCalledWith('test-operation', 50);
|
||||
});
|
||||
|
||||
it('should verify stream subscription with exact parameters', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation&lastEventId=456',
|
||||
);
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockResolvedValue(undefined);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
|
||||
// Verify exact parameter passing
|
||||
expect(mockStreamEventManager.subscribeStreamEvents).toHaveBeenCalledWith(
|
||||
'test-operation',
|
||||
'456',
|
||||
expect.any(Function), // callback function
|
||||
expect.any(AbortSignal), // abort signal
|
||||
);
|
||||
|
||||
// Verify the callback function structure
|
||||
const callArgs = mockStreamEventManager.subscribeStreamEvents.mock.calls[0];
|
||||
expect(callArgs).toHaveLength(4);
|
||||
expect(typeof callArgs[2]).toBe('function'); // callback
|
||||
expect(callArgs[3]).toBeInstanceOf(AbortSignal); // signal
|
||||
});
|
||||
|
||||
it('should verify default parameters with exact values', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockResolvedValue(undefined);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
|
||||
// Verify default values are used
|
||||
expect(mockStreamEventManager.subscribeStreamEvents).toHaveBeenCalledWith(
|
||||
'test-operation',
|
||||
'0', // default lastEventId
|
||||
expect.any(Function),
|
||||
expect.any(AbortSignal),
|
||||
);
|
||||
|
||||
// Verify getStreamHistory is NOT called when includeHistory defaults to false
|
||||
expect(mockStreamEventManager.getStreamHistory).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should verify SSE message structure with exact format specification', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
const decoder = new TextDecoder();
|
||||
const reader = response.body!.getReader();
|
||||
|
||||
// Collect all chunks
|
||||
const chunks = [];
|
||||
let readCount = 0;
|
||||
const maxReads = 1; // Only read connection event
|
||||
|
||||
try {
|
||||
while (readCount < maxReads) {
|
||||
const readPromise = reader.read();
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Read timeout')), 1000),
|
||||
);
|
||||
|
||||
const result = (await Promise.race([
|
||||
readPromise,
|
||||
timeoutPromise,
|
||||
])) as ReadableStreamReadResult<Uint8Array>;
|
||||
|
||||
if (result.done) break;
|
||||
if (result.value) {
|
||||
const chunk =
|
||||
result.value instanceof Uint8Array
|
||||
? decoder.decode(result.value)
|
||||
: String(result.value);
|
||||
chunks.push(chunk);
|
||||
readCount++;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Timeout or error
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
// Verify exact stream format with default lastEventId (new SSE format)
|
||||
expect(chunks).toEqual([
|
||||
`id: conn_${MOCK_TIMESTAMP}\nevent: connected\ndata: {"lastEventId":"0","operationId":"test-operation","timestamp":${MOCK_TIMESTAMP},"type":"connected"}\n\n`,
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Agent Runtime Lifecycle', () => {
|
||||
it('should verify agent runtime event handling and connection closure logic', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
// Capture the event callback so we can test the event processing logic directly
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
let capturedSignal: AbortSignal | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, signal) => {
|
||||
capturedCallback = callback;
|
||||
capturedSignal = signal;
|
||||
return Promise.resolve();
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
// Verify the subscription was set up correctly
|
||||
expect(mockStreamEventManager.subscribeStreamEvents).toHaveBeenCalledWith(
|
||||
'test-operation',
|
||||
'0',
|
||||
expect.any(Function),
|
||||
expect.any(AbortSignal),
|
||||
);
|
||||
expect(capturedCallback).toBeDefined();
|
||||
expect(capturedSignal).toBeDefined();
|
||||
|
||||
// Verify response headers are correct
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.headers.get('Content-Type')).toBe('text/event-stream');
|
||||
|
||||
// Test that the callback exists and can be called
|
||||
expect(typeof capturedCallback).toBe('function');
|
||||
expect(capturedSignal).toBeInstanceOf(AbortSignal);
|
||||
});
|
||||
|
||||
it('should verify subscribeStreamEvents callback can handle agent_runtime_init events', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, _signal) => {
|
||||
capturedCallback = callback;
|
||||
return Promise.resolve();
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
// Verify we captured the callback
|
||||
expect(capturedCallback).toBeDefined();
|
||||
expect(response.status).toBe(200);
|
||||
|
||||
// Test agent_runtime_init event processing
|
||||
const initEvent = {
|
||||
type: 'agent_runtime_init',
|
||||
timestamp: MOCK_TIMESTAMP + 100,
|
||||
operationId: 'test-operation',
|
||||
data: {
|
||||
userId: 'user-123',
|
||||
modelConfig: { model: 'gpt-4', temperature: 0.7 },
|
||||
agentType: 'assistant',
|
||||
},
|
||||
};
|
||||
|
||||
// The callback should be callable without throwing errors
|
||||
expect(() => capturedCallback!([initEvent])).not.toThrow();
|
||||
});
|
||||
|
||||
it('should verify subscribeStreamEvents callback can handle agent_runtime_end events', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, _signal) => {
|
||||
capturedCallback = callback;
|
||||
return Promise.resolve();
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
// Verify we captured the callback
|
||||
expect(capturedCallback).toBeDefined();
|
||||
expect(response.status).toBe(200);
|
||||
|
||||
// Test agent_runtime_end event processing
|
||||
const endEvent = {
|
||||
type: 'agent_runtime_end',
|
||||
timestamp: MOCK_TIMESTAMP + 600,
|
||||
operationId: 'test-operation',
|
||||
data: {
|
||||
totalSteps: 1,
|
||||
executionTime: 500,
|
||||
status: 'completed',
|
||||
},
|
||||
};
|
||||
|
||||
// The callback should be callable without throwing errors
|
||||
expect(() => capturedCallback!([endEvent])).not.toThrow();
|
||||
});
|
||||
|
||||
it('should verify complete agent runtime lifecycle event types are supported', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, _signal) => {
|
||||
capturedCallback = callback;
|
||||
return Promise.resolve();
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(capturedCallback).toBeDefined();
|
||||
expect(response.status).toBe(200);
|
||||
|
||||
// Test complete lifecycle events can be processed
|
||||
const lifecycleEvents = [
|
||||
{
|
||||
type: 'agent_runtime_init',
|
||||
timestamp: MOCK_TIMESTAMP + 100,
|
||||
operationId: 'test-operation',
|
||||
data: { userId: 'user-123', agentType: 'assistant' },
|
||||
},
|
||||
{
|
||||
type: 'stream_start',
|
||||
timestamp: MOCK_TIMESTAMP + 200,
|
||||
operationId: 'test-operation',
|
||||
data: { messageId: 'msg-001' },
|
||||
},
|
||||
{
|
||||
type: 'stream_chunk',
|
||||
timestamp: MOCK_TIMESTAMP + 300,
|
||||
operationId: 'test-operation',
|
||||
data: { content: 'Hello world' },
|
||||
},
|
||||
{
|
||||
type: 'stream_end',
|
||||
timestamp: MOCK_TIMESTAMP + 400,
|
||||
operationId: 'test-operation',
|
||||
data: { messageId: 'msg-001' },
|
||||
},
|
||||
{
|
||||
type: 'agent_runtime_end',
|
||||
timestamp: MOCK_TIMESTAMP + 500,
|
||||
operationId: 'test-operation',
|
||||
data: { status: 'completed', totalSteps: 1 },
|
||||
},
|
||||
];
|
||||
|
||||
// All lifecycle events should be processable without throwing errors
|
||||
expect(() => capturedCallback!(lifecycleEvents)).not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Heartbeat and connection lifecycle', () => {
|
||||
it('should close connection immediately after agent_runtime_end', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
let capturedSignal: AbortSignal | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, signal) => {
|
||||
capturedCallback = callback;
|
||||
capturedSignal = signal;
|
||||
return new Promise(() => {});
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(capturedCallback).toBeDefined();
|
||||
expect(capturedSignal).toBeDefined();
|
||||
|
||||
// Signal should not be aborted initially
|
||||
expect(capturedSignal!.aborted).toBe(false);
|
||||
|
||||
// Simulate agent_runtime_end event
|
||||
const endEvent = {
|
||||
type: 'agent_runtime_end',
|
||||
timestamp: MOCK_TIMESTAMP + 1000,
|
||||
operationId: 'test-operation',
|
||||
data: { status: 'completed' },
|
||||
};
|
||||
capturedCallback!([endEvent]);
|
||||
|
||||
// Signal should be aborted immediately after agent_runtime_end
|
||||
expect(capturedSignal!.aborted).toBe(true);
|
||||
});
|
||||
|
||||
it('should set streamEnded flag and close connection when agent_runtime_end is received', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
let capturedSignal: AbortSignal | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, signal) => {
|
||||
capturedCallback = callback;
|
||||
capturedSignal = signal;
|
||||
return new Promise(() => {});
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(capturedCallback).toBeDefined();
|
||||
expect(capturedSignal).toBeDefined();
|
||||
|
||||
// Simulate agent_runtime_end event - this should set streamEnded = true
|
||||
const endEvent = {
|
||||
type: 'agent_runtime_end',
|
||||
timestamp: MOCK_TIMESTAMP + 1000,
|
||||
operationId: 'test-operation',
|
||||
data: { status: 'completed' },
|
||||
};
|
||||
|
||||
// This should not throw - verifies the callback can handle the event
|
||||
expect(() => capturedCallback!([endEvent])).not.toThrow();
|
||||
// Signal should be aborted (connection closed)
|
||||
expect(capturedSignal!.aborted).toBe(true);
|
||||
});
|
||||
|
||||
it('should handle agent_runtime_end event in callback without errors', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, _signal) => {
|
||||
capturedCallback = callback;
|
||||
return new Promise(() => {});
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(capturedCallback).toBeDefined();
|
||||
|
||||
// Simulate agent_runtime_end with full data
|
||||
const endEvent = {
|
||||
type: 'agent_runtime_end',
|
||||
timestamp: MOCK_TIMESTAMP + 1000,
|
||||
operationId: 'test-operation',
|
||||
data: {
|
||||
finalState: { status: 'completed' },
|
||||
reason: 'completed',
|
||||
reasonDetail: 'Agent runtime completed successfully',
|
||||
},
|
||||
};
|
||||
|
||||
// Verify the event is processed without throwing
|
||||
expect(() => capturedCallback!([endEvent])).not.toThrow();
|
||||
});
|
||||
|
||||
it('should handle batch events including agent_runtime_end without errors', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, _signal) => {
|
||||
capturedCallback = callback;
|
||||
return new Promise(() => {});
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(capturedCallback).toBeDefined();
|
||||
|
||||
// Simulate batch of events ending with agent_runtime_end
|
||||
const batchEvents = [
|
||||
{
|
||||
type: 'stream_chunk',
|
||||
timestamp: MOCK_TIMESTAMP + 800,
|
||||
operationId: 'test-operation',
|
||||
data: { content: 'Final chunk' },
|
||||
},
|
||||
{
|
||||
type: 'stream_end',
|
||||
timestamp: MOCK_TIMESTAMP + 900,
|
||||
operationId: 'test-operation',
|
||||
data: { messageId: 'msg-001' },
|
||||
},
|
||||
{
|
||||
type: 'agent_runtime_end',
|
||||
timestamp: MOCK_TIMESTAMP + 1000,
|
||||
operationId: 'test-operation',
|
||||
data: { status: 'completed' },
|
||||
},
|
||||
];
|
||||
|
||||
// All events should be processed without throwing
|
||||
expect(() => capturedCallback!(batchEvents)).not.toThrow();
|
||||
});
|
||||
|
||||
it('should skip events after streamEnded flag is set', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test-operation',
|
||||
);
|
||||
|
||||
let capturedCallback: ((events: any[]) => void) | null = null;
|
||||
let capturedSignal: AbortSignal | null = null;
|
||||
|
||||
mockStreamEventManager.subscribeStreamEvents.mockImplementation(
|
||||
(operationId, lastEventId, callback, signal) => {
|
||||
capturedCallback = callback;
|
||||
capturedSignal = signal;
|
||||
return new Promise(() => {});
|
||||
},
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(capturedCallback).toBeDefined();
|
||||
expect(capturedSignal).toBeDefined();
|
||||
expect(capturedSignal!.aborted).toBe(false);
|
||||
|
||||
// First, send agent_runtime_end to set streamEnded = true
|
||||
capturedCallback!([
|
||||
{
|
||||
type: 'agent_runtime_end',
|
||||
timestamp: MOCK_TIMESTAMP + 1000,
|
||||
operationId: 'test-operation',
|
||||
data: { status: 'completed' },
|
||||
},
|
||||
]);
|
||||
|
||||
// Signal should be aborted immediately
|
||||
expect(capturedSignal!.aborted).toBe(true);
|
||||
|
||||
// Any subsequent events should be skipped (no errors)
|
||||
expect(() =>
|
||||
capturedCallback!([
|
||||
{
|
||||
type: 'step_complete',
|
||||
timestamp: MOCK_TIMESTAMP + 1100,
|
||||
operationId: 'test-operation',
|
||||
data: { stepIndex: 1 },
|
||||
},
|
||||
]),
|
||||
).not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Parameter validation', () => {
|
||||
it('should handle operationId with special characters', async () => {
|
||||
const operationId = 'test-operation-123_456';
|
||||
const request = new NextRequest(
|
||||
`https://test.com/api/agent/stream?operationId=${operationId}`,
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
});
|
||||
|
||||
it('should handle lastEventId as string number', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test&lastEventId=12345',
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
});
|
||||
|
||||
it('should handle includeHistory as string boolean', async () => {
|
||||
const request = new NextRequest(
|
||||
'https://test.com/api/agent/stream?operationId=test&includeHistory=false',
|
||||
);
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(mockStreamEventManager.getStreamHistory).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should handle invalid URL gracefully', async () => {
|
||||
const request = new NextRequest('https://test.com/api/agent/stream?operationId=');
|
||||
|
||||
const response = await GET(request);
|
||||
|
||||
expect(response.status).toBe(400);
|
||||
const data = await response.json();
|
||||
expect(data.error).toBe('operationId parameter is required');
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,213 +0,0 @@
|
||||
import { createSSEHeaders, createSSEWriter } from '@lobechat/utils/server';
|
||||
import debug from 'debug';
|
||||
import { type NextRequest } from 'next/server';
|
||||
import { NextResponse } from 'next/server';
|
||||
|
||||
import { createStreamEventManager } from '@/server/modules/AgentRuntime';
|
||||
|
||||
const log = debug('api-route:agent:stream');
|
||||
const timing = debug('lobe-server:agent-runtime:timing');
|
||||
|
||||
/**
|
||||
* Server-Sent Events (SSE) endpoint
|
||||
* Provides real-time Agent execution event stream for clients
|
||||
*/
|
||||
export async function GET(request: NextRequest) {
|
||||
// Initialize stream event manager (uses InMemory singleton in local dev, Redis in production)
|
||||
const streamManager = createStreamEventManager();
|
||||
|
||||
const { searchParams } = new URL(request.url);
|
||||
const operationId = searchParams.get('operationId');
|
||||
const lastEventId = searchParams.get('lastEventId') || '0';
|
||||
const includeHistory = searchParams.get('includeHistory') === 'true';
|
||||
|
||||
if (!operationId) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'operationId parameter is required',
|
||||
},
|
||||
{ status: 400 },
|
||||
);
|
||||
}
|
||||
|
||||
log(`Starting SSE connection for operation ${operationId} from eventId ${lastEventId}`);
|
||||
|
||||
// Create Server-Sent Events stream
|
||||
const stream = new ReadableStream({
|
||||
cancel(reason) {
|
||||
log(`SSE connection cancelled for operation ${operationId}:`, reason);
|
||||
|
||||
// Call cleanup function
|
||||
if ((this as any)._cleanup) {
|
||||
(this as any)._cleanup();
|
||||
}
|
||||
},
|
||||
|
||||
start(controller) {
|
||||
const writer = createSSEWriter(controller);
|
||||
|
||||
// Send connection confirmation event
|
||||
writer.writeConnection(operationId, lastEventId);
|
||||
log(`SSE connection established for operation ${operationId}`);
|
||||
|
||||
// If needed, send historical events first
|
||||
if (includeHistory) {
|
||||
streamManager
|
||||
.getStreamHistory(operationId, 50)
|
||||
.then((history) => {
|
||||
// Send historical events in chronological order (earliest first)
|
||||
const sortedHistory = history.reverse();
|
||||
|
||||
sortedHistory.forEach((event) => {
|
||||
// Only send events newer than lastEventId
|
||||
if (!lastEventId || lastEventId === '0' || event.timestamp.toString() > lastEventId) {
|
||||
try {
|
||||
// Add SSE-specific fields, keeping format consistent with real-time events
|
||||
const sseEvent = {
|
||||
...event,
|
||||
operationId,
|
||||
timestamp: event.timestamp || Date.now(),
|
||||
};
|
||||
writer.writeStreamEvent(sseEvent, operationId);
|
||||
} catch (error) {
|
||||
console.error('[Agent Stream] Error sending history event:', error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (sortedHistory.length > 0) {
|
||||
log(`Sent ${sortedHistory.length} historical events for operation ${operationId}`);
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error('[Agent Stream] Failed to load history:', error);
|
||||
|
||||
try {
|
||||
writer.writeError(error, operationId, 'history_loading');
|
||||
} catch (controllerError) {
|
||||
console.error('[Agent Stream] Failed to send error event:', controllerError);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Create AbortController for canceling subscription
|
||||
const abortController = new AbortController();
|
||||
|
||||
// Track if stream has ended (agent_runtime_end received)
|
||||
// Once set to true, no more events will be sent
|
||||
let streamEnded = false;
|
||||
|
||||
// Send heartbeat periodically (every 30 seconds)
|
||||
const heartbeatInterval = setInterval(() => {
|
||||
// Skip heartbeat if stream has ended
|
||||
if (streamEnded) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const heartbeat = {
|
||||
operationId,
|
||||
timestamp: Date.now(),
|
||||
type: 'heartbeat',
|
||||
};
|
||||
|
||||
controller.enqueue(`data: ${JSON.stringify(heartbeat)}\n\n`);
|
||||
} catch (error) {
|
||||
console.error('[Agent Stream] Heartbeat error:', error);
|
||||
clearInterval(heartbeatInterval);
|
||||
}
|
||||
}, 30_000);
|
||||
|
||||
// Cleanup function
|
||||
const cleanup = () => {
|
||||
abortController.abort();
|
||||
clearInterval(heartbeatInterval);
|
||||
log(`SSE connection closed for operation ${operationId}`);
|
||||
};
|
||||
|
||||
// Subscribe to new streaming events
|
||||
const subscribeToEvents = async () => {
|
||||
try {
|
||||
await streamManager.subscribeStreamEvents(
|
||||
operationId,
|
||||
lastEventId,
|
||||
(events) => {
|
||||
events.forEach((event) => {
|
||||
// Skip all events if stream has ended
|
||||
if (streamEnded) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Add SSE-specific fields
|
||||
const sseEvent = {
|
||||
...event,
|
||||
operationId,
|
||||
timestamp: event.timestamp || Date.now(),
|
||||
};
|
||||
|
||||
const now = Date.now();
|
||||
const totalLatency = now - sseEvent.timestamp;
|
||||
writer.writeStreamEvent(sseEvent, operationId);
|
||||
timing(
|
||||
'[%s:%d] SSE sent %s, original timestamp %d, sent at %d, total latency %dms',
|
||||
operationId,
|
||||
event.stepIndex,
|
||||
event.type,
|
||||
sseEvent.timestamp,
|
||||
now,
|
||||
totalLatency,
|
||||
);
|
||||
|
||||
// If agent_runtime_end event is received, terminate stream immediately
|
||||
if (event.type === 'agent_runtime_end') {
|
||||
log(
|
||||
`Agent runtime ended for operation ${operationId}, terminating stream immediately`,
|
||||
);
|
||||
|
||||
// Mark stream as ended to prevent any more events
|
||||
streamEnded = true;
|
||||
|
||||
// Immediately cleanup and close connection
|
||||
cleanup();
|
||||
controller.close();
|
||||
log(
|
||||
`SSE connection closed after agent runtime end for operation ${operationId}`,
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[Agent Stream] Error sending event:', error);
|
||||
}
|
||||
});
|
||||
},
|
||||
abortController.signal,
|
||||
);
|
||||
} catch (error) {
|
||||
if (!abortController.signal.aborted) {
|
||||
console.error('[Agent Stream] Subscription error:', error);
|
||||
|
||||
try {
|
||||
writer.writeError(error as Error, operationId, 'stream_subscription');
|
||||
} catch (controllerError) {
|
||||
console.error('[Agent Stream] Failed to send subscription error:', controllerError);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Start subscription
|
||||
subscribeToEvents();
|
||||
|
||||
// Listen for connection close
|
||||
request.signal?.addEventListener('abort', cleanup);
|
||||
|
||||
// Store cleanup function for calling during cancel
|
||||
(controller as any)._cleanup = cleanup;
|
||||
},
|
||||
});
|
||||
|
||||
// Set SSE response headers
|
||||
return new Response(stream, {
|
||||
headers: createSSEHeaders(),
|
||||
});
|
||||
}
|
||||
@@ -1,228 +0,0 @@
|
||||
// @vitest-environment happy-dom
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { agentRuntimeClient } from '../client';
|
||||
|
||||
// Mock fetchEventSource
|
||||
const mockFetchEventSource = vi.fn();
|
||||
vi.mock('@lobechat/utils/client', () => ({
|
||||
fetchEventSource: (url: string, options: any) => mockFetchEventSource(url, options),
|
||||
}));
|
||||
|
||||
describe('AgentRuntimeClient', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('createStreamConnection', () => {
|
||||
it('should create connection with correct URL and parameters', () => {
|
||||
const operationId = 'agent_1758302563222_0g28qmdmu';
|
||||
|
||||
agentRuntimeClient.createStreamConnection(operationId, {
|
||||
includeHistory: false,
|
||||
lastEventId: '0',
|
||||
});
|
||||
|
||||
expect(mockFetchEventSource).toHaveBeenCalledWith(
|
||||
'/api/agent/stream?includeHistory=false&lastEventId=0&operationId=agent_1758302563222_0g28qmdmu',
|
||||
expect.objectContaining({
|
||||
headers: {
|
||||
'Cache-Control': 'no-cache',
|
||||
'Last-Event-ID': '0',
|
||||
},
|
||||
signal: expect.any(AbortSignal),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should handle complete agent runtime lifecycle with real stream data', async () => {
|
||||
const operationId = 'agent_1758302563222_abc';
|
||||
const events: any[] = [];
|
||||
let connectCalled = false;
|
||||
let disconnectCalled = false;
|
||||
|
||||
// Capture the callbacks passed to fetchEventSource
|
||||
mockFetchEventSource.mockImplementation((url: string, options: any) => {
|
||||
// Simulate connection opening
|
||||
setTimeout(() => {
|
||||
options.onopen?.({ ok: true, status: 200, statusText: 'OK' });
|
||||
}, 10);
|
||||
|
||||
// Simulate receiving events
|
||||
setTimeout(() => {
|
||||
const streamEvents = [
|
||||
`{"lastEventId":"0","operationId":"${operationId}","timestamp":1758302567925,"type":"connected"}`,
|
||||
`{"type":"agent_runtime_init","stepIndex":0,"operationId":"${operationId}","data":{"agentConfig":{"enableSearch":true}},"timestamp":1758302564421}`,
|
||||
`{"type":"stream_start","stepIndex":0,"operationId":"${operationId}","data":{"messageId":"msg1","model":"gpt-4"},"timestamp":1758302574552}`,
|
||||
`{"type":"stream_chunk","stepIndex":0,"operationId":"${operationId}","data":{"content":"Hello"},"timestamp":1758302578042}`,
|
||||
`{"type":"stream_end","stepIndex":0,"operationId":"${operationId}","data":{"finalContent":"Hello world"},"timestamp":1758302626595}`,
|
||||
`{"type":"agent_runtime_end","stepIndex":0,"operationId":"${operationId}","data":{"status":"completed"},"timestamp":1758302631030}`,
|
||||
];
|
||||
|
||||
streamEvents.forEach((eventData) => {
|
||||
options.onmessage?.({ data: eventData, event: 'message' });
|
||||
});
|
||||
}, 20);
|
||||
});
|
||||
|
||||
agentRuntimeClient.createStreamConnection(operationId, {
|
||||
includeHistory: false,
|
||||
onConnect: () => {
|
||||
connectCalled = true;
|
||||
},
|
||||
onDisconnect: () => {
|
||||
disconnectCalled = true;
|
||||
},
|
||||
onEvent: (event) => {
|
||||
events.push(event);
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for all events
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(connectCalled).toBe(true);
|
||||
expect(events).toHaveLength(6);
|
||||
|
||||
// Verify event sequence
|
||||
expect(events[0].type).toBe('connected');
|
||||
expect(events[1].type).toBe('agent_runtime_init');
|
||||
expect(events[2].type).toBe('stream_start');
|
||||
expect(events[3].type).toBe('stream_chunk');
|
||||
expect(events[4].type).toBe('stream_end');
|
||||
expect(events[5].type).toBe('agent_runtime_end');
|
||||
|
||||
// Verify operationId consistency
|
||||
events.forEach((event) => {
|
||||
expect(event.operationId).toBe(operationId);
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle heartbeat events correctly', async () => {
|
||||
const operationId = 'test-operation';
|
||||
const events: any[] = [];
|
||||
|
||||
mockFetchEventSource.mockImplementation((url: string, options: any) => {
|
||||
setTimeout(() => {
|
||||
const heartbeatData = `{"operationId":"${operationId}","timestamp":1758302597927,"type":"heartbeat"}`;
|
||||
options.onmessage?.({ data: heartbeatData, event: 'message' });
|
||||
}, 10);
|
||||
});
|
||||
|
||||
agentRuntimeClient.createStreamConnection(operationId, {
|
||||
onEvent: (event) => {
|
||||
events.push(event);
|
||||
},
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||
|
||||
expect(events).toHaveLength(1);
|
||||
expect(events[0].type).toBe('heartbeat');
|
||||
expect(events[0].operationId).toBe(operationId);
|
||||
});
|
||||
|
||||
it('should handle connection errors', async () => {
|
||||
const operationId = 'test-operation';
|
||||
let errorOccurred = false;
|
||||
|
||||
mockFetchEventSource.mockImplementation((url: string, options: any) => {
|
||||
setTimeout(() => {
|
||||
options.onerror?.(new Error('Connection failed'));
|
||||
}, 10);
|
||||
});
|
||||
|
||||
agentRuntimeClient.createStreamConnection(operationId, {
|
||||
onError: (error) => {
|
||||
errorOccurred = true;
|
||||
expect(error.message).toBe('Connection failed');
|
||||
},
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||
|
||||
expect(errorOccurred).toBe(true);
|
||||
});
|
||||
|
||||
it('should handle malformed JSON gracefully', async () => {
|
||||
const operationId = 'test-operation';
|
||||
const events: any[] = [];
|
||||
let errorOccurred = false;
|
||||
|
||||
mockFetchEventSource.mockImplementation((url: string, options: any) => {
|
||||
setTimeout(() => {
|
||||
options.onmessage?.({ data: 'invalid json', event: 'message' });
|
||||
}, 10);
|
||||
});
|
||||
|
||||
agentRuntimeClient.createStreamConnection(operationId, {
|
||||
onEvent: (event) => {
|
||||
events.push(event);
|
||||
},
|
||||
onError: (error) => {
|
||||
errorOccurred = true;
|
||||
},
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||
|
||||
expect(events).toHaveLength(0);
|
||||
expect(errorOccurred).toBe(true);
|
||||
});
|
||||
|
||||
it('should call onDisconnect when connection is closed', async () => {
|
||||
const operationId = 'test-operation';
|
||||
let disconnectCalled = false;
|
||||
|
||||
mockFetchEventSource.mockImplementation((url: string, options: any) => {
|
||||
setTimeout(() => {
|
||||
options.onclose?.();
|
||||
}, 10);
|
||||
});
|
||||
|
||||
agentRuntimeClient.createStreamConnection(operationId, {
|
||||
onDisconnect: () => {
|
||||
disconnectCalled = true;
|
||||
},
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||
|
||||
expect(disconnectCalled).toBe(true);
|
||||
});
|
||||
|
||||
it('should include correct parameters in URL', () => {
|
||||
const operationId = 'test-operation-123';
|
||||
|
||||
agentRuntimeClient.createStreamConnection(operationId, {
|
||||
includeHistory: true,
|
||||
lastEventId: '12345',
|
||||
});
|
||||
|
||||
expect(mockFetchEventSource).toHaveBeenCalledWith(
|
||||
'/api/agent/stream?includeHistory=true&lastEventId=12345&operationId=test-operation-123',
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it('should use default parameters when not provided', () => {
|
||||
const operationId = 'test-operation';
|
||||
|
||||
agentRuntimeClient.createStreamConnection(operationId);
|
||||
|
||||
expect(mockFetchEventSource).toHaveBeenCalledWith(
|
||||
'/api/agent/stream?includeHistory=false&lastEventId=0&operationId=test-operation',
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it('should return AbortController for cancellation', () => {
|
||||
const operationId = 'test-operation';
|
||||
|
||||
const controller = agentRuntimeClient.createStreamConnection(operationId);
|
||||
|
||||
expect(controller).toBeInstanceOf(AbortController);
|
||||
expect(controller.signal).toBeInstanceOf(AbortSignal);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,77 +0,0 @@
|
||||
import { fetchEventSource } from '@lobechat/utils/client';
|
||||
import debug from 'debug';
|
||||
|
||||
import { type StreamConnectionOptions, type StreamEvent } from './type';
|
||||
|
||||
const log = debug('lobe-agent-runtime:client');
|
||||
|
||||
/**
|
||||
* Agent Client Service for communicating with durable agents
|
||||
*/
|
||||
class AgentRuntimeClient {
|
||||
private baseUrl = '/api/agent';
|
||||
|
||||
/**
|
||||
* Create a streaming connection to receive real-time agent events
|
||||
*/
|
||||
createStreamConnection(
|
||||
operationId: string,
|
||||
options: StreamConnectionOptions = {},
|
||||
): AbortController {
|
||||
const {
|
||||
includeHistory = false,
|
||||
lastEventId = '0',
|
||||
onEvent,
|
||||
onError,
|
||||
onConnect,
|
||||
onDisconnect,
|
||||
} = options;
|
||||
|
||||
const params = new URLSearchParams({
|
||||
includeHistory: includeHistory.toString(),
|
||||
lastEventId,
|
||||
operationId,
|
||||
});
|
||||
|
||||
const controller = new AbortController();
|
||||
|
||||
fetchEventSource(`${this.baseUrl}/stream?${params}`, {
|
||||
headers: {
|
||||
'Cache-Control': 'no-cache',
|
||||
'Last-Event-ID': lastEventId,
|
||||
},
|
||||
onclose: () => {
|
||||
log(`Stream connection closed for operation ${operationId}`);
|
||||
onDisconnect?.();
|
||||
},
|
||||
onerror: (error) => {
|
||||
console.error(`[AgentClientService] Stream error for operation ${operationId}:`, error);
|
||||
onError?.(error instanceof Error ? error : new Error('Stream connection error'));
|
||||
},
|
||||
onmessage: (event) => {
|
||||
try {
|
||||
const data = JSON.parse(event.data) as StreamEvent;
|
||||
log(`Received event: ${event.event || 'message'}`, event.data);
|
||||
|
||||
onEvent?.(data);
|
||||
} catch (error) {
|
||||
console.error('[AgentClientService] Failed to parse stream event:', error);
|
||||
onError?.(new Error('Failed to parse stream event'));
|
||||
}
|
||||
},
|
||||
onopen: async (response) => {
|
||||
if (response.ok) {
|
||||
log(`Stream connection opened for operation ${operationId}`);
|
||||
onConnect?.();
|
||||
} else {
|
||||
throw new Error(`Failed to open stream: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
},
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
return controller;
|
||||
}
|
||||
}
|
||||
|
||||
export const agentRuntimeClient = new AgentRuntimeClient();
|
||||
@@ -7,7 +7,6 @@ import { contextEngineering } from '@/services/chat/mecha';
|
||||
import { getAgentStoreState } from '@/store/agent';
|
||||
import { agentChatConfigSelectors, agentSelectors } from '@/store/agent/selectors';
|
||||
|
||||
export { agentRuntimeClient } from './client';
|
||||
export * from './type';
|
||||
|
||||
interface AgentOperationRequest {
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import { act, renderHook } from '@testing-library/react';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { lambdaClient } from '@/libs/trpc/client';
|
||||
import { agentRuntimeClient } from '@/services/agentRuntime';
|
||||
import { useChatStore } from '@/store/chat/store';
|
||||
|
||||
// Keep zustand mock as it's needed globally
|
||||
@@ -24,14 +23,35 @@ vi.mock('@/libs/trpc/client', () => ({
|
||||
},
|
||||
}));
|
||||
|
||||
// Mock agentRuntimeClient
|
||||
vi.mock('@/services/agentRuntime', () => ({
|
||||
agentRuntimeClient: {
|
||||
createStreamConnection: vi.fn(),
|
||||
},
|
||||
StreamEvent: {},
|
||||
// Mock AgentGatewayClient
|
||||
vi.mock('@lobechat/agent-gateway-client', () => ({
|
||||
AgentGatewayClient: vi.fn().mockImplementation(() => ({
|
||||
connect: vi.fn(),
|
||||
disconnect: vi.fn(),
|
||||
on: vi.fn(),
|
||||
off: vi.fn(),
|
||||
sendInterrupt: vi.fn(),
|
||||
sendToolConfirmation: vi.fn(),
|
||||
sendUserInput: vi.fn(),
|
||||
connectionStatus: 'disconnected',
|
||||
})),
|
||||
}));
|
||||
|
||||
// Mock global_serverConfigStore on window
|
||||
const mockServerConfigStore = {
|
||||
getState: vi.fn().mockReturnValue({
|
||||
serverConfig: { agentGatewayUrl: 'https://test-gateway.example.com' },
|
||||
}),
|
||||
};
|
||||
|
||||
beforeAll(() => {
|
||||
(window as any).global_serverConfigStore = mockServerConfigStore;
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
delete (window as any).global_serverConfigStore;
|
||||
});
|
||||
|
||||
// Test Constants
|
||||
const TEST_IDS = {
|
||||
AGENT_ID: 'test-agent-id',
|
||||
@@ -134,6 +154,12 @@ describe('agentGroup actions', () => {
|
||||
onOperationCancel: vi.fn(),
|
||||
switchTopic: vi.fn(),
|
||||
associateMessageWithOperation: vi.fn(),
|
||||
isAgentGatewayAvailable: vi.fn().mockReturnValue(true),
|
||||
internal_connectAgentGateway: vi.fn().mockReturnValue({
|
||||
connect: vi.fn(),
|
||||
disconnect: vi.fn(),
|
||||
sendInterrupt: vi.fn(),
|
||||
}),
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -218,7 +244,7 @@ describe('agentGroup actions', () => {
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
const context = createTestContext();
|
||||
|
||||
@@ -249,7 +275,7 @@ describe('agentGroup actions', () => {
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -291,7 +317,7 @@ describe('agentGroup actions', () => {
|
||||
capturedState = useChatStore.getState().isCreatingMessage;
|
||||
return createMockExecGroupAgentResponse();
|
||||
});
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -313,7 +339,7 @@ describe('agentGroup actions', () => {
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
const context = createTestContext({ topicId: TEST_IDS.TOPIC_ID });
|
||||
|
||||
@@ -342,7 +368,7 @@ describe('agentGroup actions', () => {
|
||||
|
||||
const mockResponse = createMockExecGroupAgentResponse();
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(mockResponse);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
const context = createTestContext();
|
||||
|
||||
@@ -371,7 +397,7 @@ describe('agentGroup actions', () => {
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -394,13 +420,12 @@ describe('agentGroup actions', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('should register cancel handler for SSE stream', async () => {
|
||||
it('should connect to gateway with correct params when gateway is available', async () => {
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -409,9 +434,13 @@ describe('agentGroup actions', () => {
|
||||
});
|
||||
});
|
||||
|
||||
expect(result.current.onOperationCancel).toHaveBeenCalledWith(
|
||||
TEST_IDS.OPERATION_ID,
|
||||
expect.any(Function),
|
||||
expect(result.current.internal_connectAgentGateway).toHaveBeenCalledWith(
|
||||
TEST_IDS.OPERATION_ID, // chatKey = operationId
|
||||
expect.objectContaining({
|
||||
assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID,
|
||||
execOperationId: 'op-exec',
|
||||
streamOperationId: TEST_IDS.OPERATION_ID,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -421,7 +450,7 @@ describe('agentGroup actions', () => {
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -446,13 +475,12 @@ describe('agentGroup actions', () => {
|
||||
expect(result.current.associateMessageWithOperation).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('should create stream connection with operationId', async () => {
|
||||
it('should connect to agent gateway with operationId as chatKey', async () => {
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -461,50 +489,15 @@ describe('agentGroup actions', () => {
|
||||
});
|
||||
});
|
||||
|
||||
expect(agentRuntimeClient.createStreamConnection).toHaveBeenCalledWith(
|
||||
TEST_IDS.OPERATION_ID,
|
||||
// Should have called internal_connectAgentGateway (which creates gateway client)
|
||||
expect(result.current.startOperation).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
includeHistory: false,
|
||||
onConnect: expect.any(Function),
|
||||
onDisconnect: expect.any(Function),
|
||||
onError: expect.any(Function),
|
||||
onEvent: expect.any(Function),
|
||||
operationId: TEST_IDS.OPERATION_ID,
|
||||
type: 'groupAgentStream',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should complete operations on stream disconnect', async () => {
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
|
||||
let onDisconnectCallback: (() => void) | undefined;
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockImplementation(
|
||||
(_operationId, options) => {
|
||||
onDisconnectCallback = options?.onDisconnect;
|
||||
return {} as any;
|
||||
},
|
||||
);
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
context: createTestContext(),
|
||||
message: TEST_CONTENT.GROUP_MESSAGE,
|
||||
});
|
||||
});
|
||||
|
||||
// Simulate stream disconnect
|
||||
await act(async () => {
|
||||
onDisconnectCallback?.();
|
||||
});
|
||||
|
||||
// Should complete both the stream operation and the main execServerAgentRuntime operation
|
||||
expect(result.current.completeOperation).toHaveBeenCalledWith(TEST_IDS.OPERATION_ID);
|
||||
expect(result.current.completeOperation).toHaveBeenCalledWith('op-exec');
|
||||
});
|
||||
|
||||
it('should update topics when new topic is created', async () => {
|
||||
const { result } = renderHook(() => useChatStore());
|
||||
|
||||
@@ -518,7 +511,7 @@ describe('agentGroup actions', () => {
|
||||
topics: mockTopics,
|
||||
}),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -546,7 +539,7 @@ describe('agentGroup actions', () => {
|
||||
topics: { items: [], total: 1 },
|
||||
}),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -569,7 +562,7 @@ describe('agentGroup actions', () => {
|
||||
isCreateNewTopic: false,
|
||||
}),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -710,7 +703,7 @@ describe('agentGroup actions', () => {
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
@@ -734,7 +727,7 @@ describe('agentGroup actions', () => {
|
||||
vi.mocked(lambdaClient.aiAgent.execGroupAgent.mutate).mockResolvedValue(
|
||||
createMockExecGroupAgentResponse(),
|
||||
);
|
||||
vi.mocked(agentRuntimeClient.createStreamConnection).mockReturnValue({} as any);
|
||||
// Gateway client is auto-mocked
|
||||
|
||||
await act(async () => {
|
||||
await result.current.sendGroupMessage({
|
||||
|
||||
@@ -5,8 +5,6 @@ import { nanoid } from '@lobechat/utils';
|
||||
import debug from 'debug';
|
||||
|
||||
import { lambdaClient } from '@/libs/trpc/client';
|
||||
import { type StreamEvent } from '@/services/agentRuntime';
|
||||
import { agentRuntimeClient } from '@/services/agentRuntime';
|
||||
import { type ChatStore } from '@/store/chat/store';
|
||||
import { type StoreSetter } from '@/store/types';
|
||||
import { setNamespace } from '@/utils/storeDebug';
|
||||
@@ -39,8 +37,7 @@ export class ChatGroupChatActionImpl {
|
||||
return;
|
||||
}
|
||||
|
||||
const { internal_handleAgentStreamEvent, optimisticCreateTmpMessage, startOperation } =
|
||||
this.#get();
|
||||
const { optimisticCreateTmpMessage, startOperation } = this.#get();
|
||||
|
||||
log(
|
||||
'sendGroupMessage: agentId=%s, groupId=%s, message=%s',
|
||||
@@ -158,18 +155,10 @@ export class ChatGroupChatActionImpl {
|
||||
return;
|
||||
}
|
||||
|
||||
// 8. Create streaming context - use assistantMessageId from backend response
|
||||
const streamContext = {
|
||||
assistantId: result.assistantMessageId,
|
||||
content: '',
|
||||
reasoning: '',
|
||||
tmpAssistantId: tempAssistantId, // Used for cleanup if needed
|
||||
};
|
||||
|
||||
// 9. Start child operation for SSE stream using backend operationId
|
||||
// 8. Start child operation for gateway stream using backend operationId
|
||||
this.#get().startOperation({
|
||||
context: { ...execContext, messageId: result.assistantMessageId },
|
||||
label: 'Group Agent Stream',
|
||||
label: 'Group Agent Gateway',
|
||||
operationId: result.operationId,
|
||||
parentOperationId: execOperationId,
|
||||
type: 'groupAgentStream',
|
||||
@@ -181,40 +170,20 @@ export class ChatGroupChatActionImpl {
|
||||
this.#get().associateMessageWithOperation(result.assistantMessageId, execOperationId);
|
||||
this.#get().associateMessageWithOperation(result.assistantMessageId, result.operationId);
|
||||
|
||||
// 10. Connect to SSE stream
|
||||
// Server will automatically close the connection after sending agent_runtime_end event
|
||||
const eventSource = agentRuntimeClient.createStreamConnection(result.operationId, {
|
||||
includeHistory: false,
|
||||
onConnect: () => {
|
||||
log('Stream connected to %s', result.operationId);
|
||||
},
|
||||
onDisconnect: () => {
|
||||
log('Stream disconnected from %s', result.operationId);
|
||||
// Complete both operations when stream disconnects (either by server close or client abort)
|
||||
this.#get().completeOperation(result.operationId);
|
||||
this.#get().completeOperation(execOperationId);
|
||||
},
|
||||
onError: (error: Error) => {
|
||||
log('Stream error for %s: %O', result.operationId, error);
|
||||
// Fail the stream operation on error
|
||||
this.#get().failOperation(result.operationId, {
|
||||
message: error.message,
|
||||
type: 'AgentStreamError',
|
||||
});
|
||||
if (streamContext.assistantId) {
|
||||
this.#get().internal_handleAgentError(streamContext.assistantId, error.message);
|
||||
}
|
||||
},
|
||||
onEvent: async (event: StreamEvent) => {
|
||||
await internal_handleAgentStreamEvent(result.operationId, event, streamContext);
|
||||
},
|
||||
});
|
||||
|
||||
// 11. Register cancel handler for aborting SSE stream
|
||||
this.#get().onOperationCancel(result.operationId, () => {
|
||||
log('Cancelling SSE stream for operation %s', result.operationId);
|
||||
eventSource.abort();
|
||||
});
|
||||
// 9. Connect to Agent Gateway via WebSocket (if available)
|
||||
if (this.#get().isAgentGatewayAvailable()) {
|
||||
const chatKey = result.operationId; // Use operationId as chatKey for gateway routing
|
||||
this.#get().internal_connectAgentGateway(chatKey, {
|
||||
assistantId: result.assistantMessageId,
|
||||
execOperationId,
|
||||
streamOperationId: result.operationId,
|
||||
});
|
||||
} else {
|
||||
log('Agent gateway not available, server-side execution will use fallback path');
|
||||
// Complete operations — the backend handles execution independently
|
||||
this.#get().completeOperation(result.operationId);
|
||||
this.#get().completeOperation(execOperationId);
|
||||
}
|
||||
} catch (error) {
|
||||
// Check if this is an abort error (user cancelled the operation)
|
||||
const isAbortError =
|
||||
|
||||
@@ -1,3 +1,12 @@
|
||||
import {
|
||||
type AgentEventMessage,
|
||||
AgentGatewayClient,
|
||||
type IAgentGatewayClient,
|
||||
type InputRequestMessage,
|
||||
type SessionCompleteMessage,
|
||||
type StatusChangeMessage,
|
||||
type ToolConfirmationRequestMessage,
|
||||
} from '@lobechat/agent-gateway-client';
|
||||
import { isDesktop } from '@lobechat/const';
|
||||
import { type ChatToolPayload } from '@lobechat/types';
|
||||
import debug from 'debug';
|
||||
@@ -25,6 +34,15 @@ type Setter = StoreSetter<ChatStore>;
|
||||
export const agentSlice = (set: Setter, get: () => ChatStore, _api?: unknown) =>
|
||||
new AgentActionImpl(set, get, _api);
|
||||
|
||||
/**
|
||||
* Read the agent gateway URL from server config.
|
||||
* Returns undefined if not configured (feature disabled).
|
||||
*/
|
||||
const getAgentGatewayUrl = (): string | undefined => {
|
||||
if (typeof window === 'undefined' || !window.global_serverConfigStore) return undefined;
|
||||
return window.global_serverConfigStore.getState()?.serverConfig?.agentGatewayUrl;
|
||||
};
|
||||
|
||||
export class AgentActionImpl {
|
||||
readonly #get: () => ChatStore;
|
||||
|
||||
@@ -334,6 +352,199 @@ export class AgentActionImpl {
|
||||
}
|
||||
};
|
||||
|
||||
// ─── Agent Gateway (WebSocket) ───
|
||||
|
||||
/**
|
||||
* Check if the Agent Gateway is available (configured via server config).
|
||||
*/
|
||||
isAgentGatewayAvailable = (): boolean => {
|
||||
return !!getAgentGatewayUrl();
|
||||
};
|
||||
|
||||
/**
|
||||
* Connect to Agent Gateway via WebSocket and wire up event handlers.
|
||||
* Returns the gateway client instance (stored on the operation for cancel handling).
|
||||
*/
|
||||
internal_connectAgentGateway = (
|
||||
chatKey: string,
|
||||
params: {
|
||||
assistantId: string;
|
||||
execOperationId: string;
|
||||
streamOperationId: string;
|
||||
},
|
||||
): IAgentGatewayClient | undefined => {
|
||||
const { assistantId, execOperationId, streamOperationId } = params;
|
||||
|
||||
const gatewayUrl = getAgentGatewayUrl();
|
||||
if (!gatewayUrl) {
|
||||
log('Agent gateway URL not configured, skipping WebSocket connection');
|
||||
this.#get().failOperation(streamOperationId, {
|
||||
message: 'Agent gateway URL not configured',
|
||||
type: 'AgentGatewayError',
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Token is empty for now — the gateway authenticates via cookie/session
|
||||
// forwarded through the WebSocket handshake. When a dedicated gateway JWT
|
||||
// endpoint is added, replace this with the fetched token.
|
||||
const token = '';
|
||||
|
||||
const client = new AgentGatewayClient({ gatewayUrl, token });
|
||||
|
||||
const streamContext = { assistantId, content: '', reasoning: '' };
|
||||
|
||||
client.on('connected', () => {
|
||||
log('Gateway connected for %s', chatKey);
|
||||
});
|
||||
|
||||
client.on('disconnected', () => {
|
||||
log('Gateway disconnected for %s', chatKey);
|
||||
this.#get().completeOperation(streamOperationId);
|
||||
this.#get().completeOperation(execOperationId);
|
||||
});
|
||||
|
||||
client.on('error', (error: Error | Event) => {
|
||||
log('Gateway error for %s: %O', chatKey, error);
|
||||
this.#get().failOperation(streamOperationId, {
|
||||
message: error instanceof Error ? error.message : 'Gateway connection error',
|
||||
type: 'AgentGatewayError',
|
||||
});
|
||||
this.#get().internal_handleAgentError(assistantId, 'Gateway connection error');
|
||||
});
|
||||
|
||||
client.on('agent_event', (message: AgentEventMessage) => {
|
||||
this.internal_handleGatewayAgentEvent(assistantId, message, streamContext);
|
||||
});
|
||||
|
||||
client.on('tool_confirmation_request', (message: ToolConfirmationRequestMessage) => {
|
||||
log('Tool confirmation request: %s', message.toolCallId);
|
||||
this.#get().updateOperationMetadata(streamOperationId, {
|
||||
gatewayClient: client,
|
||||
needsHumanInput: true,
|
||||
pendingApproval: [{ tool: message.tool, toolCallId: message.toolCallId }],
|
||||
});
|
||||
this.#get().internal_toggleMessageLoading(false, assistantId);
|
||||
});
|
||||
|
||||
client.on('input_request', (message: InputRequestMessage) => {
|
||||
log('User input request: %s', message.requestId);
|
||||
this.#get().updateOperationMetadata(streamOperationId, {
|
||||
gatewayClient: client,
|
||||
needsHumanInput: true,
|
||||
pendingPrompt: message.prompt,
|
||||
pendingRequestId: message.requestId,
|
||||
});
|
||||
this.#get().internal_toggleMessageLoading(false, assistantId);
|
||||
});
|
||||
|
||||
client.on('session_complete', (_message: SessionCompleteMessage) => {
|
||||
log('Session complete for %s', chatKey);
|
||||
this.#get().internal_toggleMessageLoading(false, assistantId);
|
||||
this.#get().completeOperation(streamOperationId);
|
||||
this.#get().completeOperation(execOperationId);
|
||||
|
||||
// Mark unread completion for background conversations
|
||||
const op = this.#get().operations[streamOperationId];
|
||||
if (op?.context.agentId) {
|
||||
this.#get().markUnreadCompleted(op.context.agentId, op.context.topicId);
|
||||
}
|
||||
|
||||
// Disconnect after completion
|
||||
client.disconnect();
|
||||
});
|
||||
|
||||
client.on('status_update', (message: StatusChangeMessage) => {
|
||||
log('Status change for %s: %s', chatKey, message.status);
|
||||
if (message.status === 'error' || message.status === 'interrupted') {
|
||||
this.#get().internal_toggleMessageLoading(false, assistantId);
|
||||
}
|
||||
});
|
||||
|
||||
// Register cancel handler
|
||||
this.#get().onOperationCancel(streamOperationId, () => {
|
||||
log('Cancelling gateway connection for %s', chatKey);
|
||||
client.sendInterrupt();
|
||||
client.disconnect();
|
||||
});
|
||||
|
||||
// Connect
|
||||
client.connect(chatKey);
|
||||
|
||||
return client;
|
||||
};
|
||||
|
||||
/**
|
||||
* Handle agent_event messages from the gateway.
|
||||
* Maps gateway AgentStreamEvent kinds to store updates.
|
||||
*/
|
||||
private internal_handleGatewayAgentEvent = (
|
||||
assistantId: string,
|
||||
message: AgentEventMessage,
|
||||
context: { assistantId: string; content: string; reasoning: string },
|
||||
): void => {
|
||||
const { internal_dispatchMessage } = this.#get();
|
||||
const event = message.event;
|
||||
|
||||
switch (event.kind) {
|
||||
case 'text_delta': {
|
||||
context.content += event.content;
|
||||
internal_dispatchMessage({
|
||||
id: assistantId,
|
||||
type: 'updateMessage',
|
||||
value: { content: context.content },
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case 'thinking': {
|
||||
context.reasoning += event.content;
|
||||
internal_dispatchMessage({
|
||||
id: assistantId,
|
||||
type: 'updateMessage',
|
||||
value: { reasoning: { content: context.reasoning } },
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_call_start': {
|
||||
// Tool call started - could update tools display
|
||||
log('Tool call start: %s (%s)', event.name, event.toolCallId);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_call_delta': {
|
||||
// Tool call arguments streaming
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_call_end': {
|
||||
log('Tool call end: %s', event.toolCallId);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'tool_result': {
|
||||
log('Tool result for %s', event.toolCallId);
|
||||
// Refresh messages to display tool results from server
|
||||
this.#get().refreshMessages();
|
||||
break;
|
||||
}
|
||||
|
||||
case 'step_complete': {
|
||||
log('Step %d complete', event.stepIndex);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'message_complete': {
|
||||
log('Message complete: %s', event.messageId);
|
||||
this.#get().internal_toggleMessageLoading(false, assistantId);
|
||||
// Refresh to get final persisted state from server
|
||||
this.#get().refreshMessages();
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
internal_handleHumanIntervention = async (
|
||||
assistantId: string,
|
||||
action: string,
|
||||
@@ -355,12 +566,30 @@ export class AgentActionImpl {
|
||||
try {
|
||||
log(`Handling human intervention ${action} for operation ${messageOpId}:`, data);
|
||||
|
||||
// Send human intervention request
|
||||
await agentRuntimeService.handleHumanIntervention({
|
||||
action: action as any,
|
||||
data,
|
||||
operationId: messageOpId,
|
||||
});
|
||||
// Check if this operation has a gateway client (WebSocket path)
|
||||
const gatewayClient = operation.metadata.gatewayClient as IAgentGatewayClient | undefined;
|
||||
if (gatewayClient) {
|
||||
// Send via WebSocket
|
||||
if (action === 'approve' || action === 'reject') {
|
||||
const toolCallId =
|
||||
data?.toolCallId || operation.metadata.pendingApproval?.[0]?.toolCallId;
|
||||
if (toolCallId) {
|
||||
gatewayClient.sendToolConfirmation(toolCallId, action === 'approve');
|
||||
}
|
||||
} else if (action === 'user_input') {
|
||||
const requestId = data?.requestId || operation.metadata.pendingRequestId;
|
||||
if (requestId && data?.content) {
|
||||
gatewayClient.sendUserInput(requestId, data.content);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Fallback to HTTP for non-gateway operations
|
||||
await agentRuntimeService.handleHumanIntervention({
|
||||
action: action as any,
|
||||
data,
|
||||
operationId: messageOpId,
|
||||
});
|
||||
}
|
||||
|
||||
// Resume loading state
|
||||
this.#get().internal_toggleMessageLoading(true, assistantId);
|
||||
|
||||
Reference in New Issue
Block a user