diff --git a/.agents/skills/local-testing/SKILL.md b/.agents/skills/local-testing/SKILL.md index db4ba57ba6..e27c54874b 100644 --- a/.agents/skills/local-testing/SKILL.md +++ b/.agents/skills/local-testing/SKILL.md @@ -1006,6 +1006,7 @@ Ready-to-use scripts in `.agents/skills/local-testing/scripts/`: | `electron-dev.sh` | Manage Electron dev env (start/stop/status/restart) | | `capture-app-window.sh` | Capture screenshot of a specific app window | | `record-electron-demo.sh` | Record Electron app demo with ffmpeg | +| `record-app-screen.sh` | Record app screen (video + screenshots, start/stop) | | `test-discord-bot.sh` | Send message to Discord bot via osascript | | `test-slack-bot.sh` | Send message to Slack bot via osascript | | `test-telegram-bot.sh` | Send message to Telegram bot via osascript | @@ -1068,25 +1069,16 @@ Each script: activates the app, navigates to the channel/contact, pastes the mes # Screen Recording -Record automated demos by combining `ffmpeg` screen capture with `agent-browser` automation. The script `.agents/skills/local-testing/scripts/record-electron-demo.sh` handles the full lifecycle for Electron. - -### Usage +Record automated demos using `record-app-screen.sh` (start/stop lifecycle, CDP screenshots + ffmpeg assembly). See [references/record-app-screen.md](references/record-app-screen.md) for full documentation. ```bash -# Run the built-in demo (queue-edit feature) -./.agents/skills/local-testing/scripts/record-electron-demo.sh - -# Run a custom automation script -./.agents/skills/local-testing/scripts/record-electron-demo.sh ./my-demo.sh /tmp/my-demo.mp4 +./.agents/skills/local-testing/scripts/electron-dev.sh start +./.agents/skills/local-testing/scripts/record-app-screen.sh start my-demo +# ... run automation ... +./.agents/skills/local-testing/scripts/record-app-screen.sh stop ``` -The script automatically: - -1. Starts Electron with CDP and waits for SPA to load -2. Detects window position, screen, and Retina scale via Swift/CGWindowList -3. Records only the Electron window region using `ffmpeg -f avfoundation` with crop -4. Runs the demo (built-in or custom script receiving CDP port as `$1`) -5. Stops recording and cleans up +Outputs to `.records/` directory (gitignored): `.mp4` (video) + `/` (screenshots every 3s). --- diff --git a/.agents/skills/local-testing/references/record-app-screen.md b/.agents/skills/local-testing/references/record-app-screen.md new file mode 100644 index 0000000000..193a5a38bb --- /dev/null +++ b/.agents/skills/local-testing/references/record-app-screen.md @@ -0,0 +1,142 @@ +# record-app-screen.sh + +General-purpose screen recording tool for the Electron app. Captures CDP screenshots as video frames and gallery snapshots, then assembles into an MP4 on stop. + +## Why CDP Screenshots Instead of ffmpeg Screen Capture + +- **Works on any screen** — CDP screenshots capture the browser viewport directly, so external monitors, Retina scaling, and window positioning are all handled automatically +- **No signal handling issues** — ffmpeg-static (npm) produces corrupt MP4 files when killed (missing moov atom). CDP screenshots avoid this entirely +- **Consistent output** — Screenshots are resolution-independent and don't require crop coordinate calculations + +## Commands + +```bash +# Start recording (Electron must be running with CDP) +.agents/skills/local-testing/scripts/record-app-screen.sh start [output_name] + +# Stop recording and assemble video +.agents/skills/local-testing/scripts/record-app-screen.sh stop + +# Check if recording is active +.agents/skills/local-testing/scripts/record-app-screen.sh status +``` + +### Arguments + +| Argument | Default | Description | +| ------------- | --------------------------- | -------------------------- | +| `output_name` | `recording-YYYYMMDD-HHMMSS` | Base name for output files | + +### Environment Variables + +| Variable | Default | Description | +| ---------------------- | ------- | -------------------------------------- | +| `CDP_PORT` | `9222` | Chrome DevTools Protocol port | +| `SCREENSHOT_INTERVAL` | `3` | Seconds between gallery screenshots | +| `VIDEO_FRAME_INTERVAL` | `0.5` | Seconds between video frames (\~2 fps) | + +## Output Structure + +``` +.records/ + .mp4 # Video assembled from frames (~2 fps) + / # Gallery screenshots (every 3s) + 0000.png + 0001.png + 0002.png + ... +``` + +The `.records/` directory is at the project root and is gitignored. + +## How It Works + +### Start + +1. Creates two background loops: + - **Video frames** — `agent-browser screenshot` every `VIDEO_FRAME_INTERVAL` seconds into a temp directory (`/tmp/record-frames-XXXXXX/`) + - **Gallery screenshots** — `agent-browser screenshot` every `SCREENSHOT_INTERVAL` seconds into `.records//` +2. Saves PIDs and paths to `/tmp/record-app-screen.pids` and `/tmp/record-app-screen.state` + +### Stop + +1. Kills both background loops +2. Assembles video frames into MP4 using ffmpeg: + ``` + ffmpeg -framerate 2 -i frame_%06d.png -c:v libx264 -crf 23 -pix_fmt yuv420p .mp4 + ``` +3. Cleans up temp frame directory +4. Reports file sizes and paths + +## Usage Examples + +### Basic Test Recording + +```bash +# Start Electron +.agents/skills/local-testing/scripts/electron-dev.sh start + +# Start recording +.agents/skills/local-testing/scripts/record-app-screen.sh start my-test + +# Run automation +agent-browser --cdp 9222 click @e61 +agent-browser --cdp 9222 type @e42 "hello" +agent-browser --cdp 9222 press Enter +sleep 10 + +# Stop and get results +.agents/skills/local-testing/scripts/record-app-screen.sh stop +# → .records/my-test.mp4 + .records/my-test/*.png +``` + +### Gateway Streaming Demo + +```bash +.agents/skills/local-testing/scripts/electron-dev.sh start + +# Inject gateway URL +agent-browser --cdp 9222 eval --stdin << 'EOF' +(function() { + var store = window.global_serverConfigStore; + store.setState({ serverConfig: { ...store.getState().serverConfig, + agentGatewayUrl: 'https://agent-gateway.lobehub.com' } }); + return 'ready'; +})() +EOF + +# Record +.agents/skills/local-testing/scripts/record-app-screen.sh start gateway-demo + +# Navigate to agent, send message, wait for completion... +# (automation commands here) + +.agents/skills/local-testing/scripts/record-app-screen.sh stop +open .records/gateway-demo.mp4 +``` + +### Check Active Recording + +```bash +.agents/skills/local-testing/scripts/record-app-screen.sh status +# [record] Active recording +# Frames: 42 captured (running: yes) +# Screenshots: 14 captured (running: yes) +# Output: .records/my-test.mp4 +``` + +## Prerequisites + +- **ffmpeg** — For video assembly. Install via `bun add -g ffmpeg-static` or `brew install ffmpeg` +- **agent-browser** — For CDP screenshots. Install via `npm i -g agent-browser` +- **Electron app running** — With CDP enabled (use `electron-dev.sh start`) + +## Troubleshooting + +| Problem | Solution | +| ----------------------------------- | ------------------------------------------------------------------------------------------------------------ | +| "No active recording found" on stop | PID file was cleaned up. Check if background processes are still running with `ps aux \| grep agent-browser` | +| "A recording is already active" | Run `stop` first, or manually clean: `rm /tmp/record-app-screen.pids /tmp/record-app-screen.state` | +| Video is 0 bytes | No frames were captured. Ensure Electron is running and CDP port is correct | +| Screenshots are blank/white | SPA may not have loaded yet. Wait for `electron-dev.sh` to report "Renderer ready" | +| ffmpeg assembly fails | Check `/tmp/ffmpeg-assemble.log`. Ensure ffmpeg is installed and frames exist | diff --git a/.agents/skills/local-testing/scripts/record-app-screen.sh b/.agents/skills/local-testing/scripts/record-app-screen.sh new file mode 100755 index 0000000000..853b5a335d --- /dev/null +++ b/.agents/skills/local-testing/scripts/record-app-screen.sh @@ -0,0 +1,189 @@ +#!/usr/bin/env bash +# +# record-app-screen.sh — Record the Electron app window (video + screenshots) +# +# Captures screenshots via agent-browser (CDP), then assembles into video on stop. +# Works on any screen (including external monitors) since it uses CDP, not screen capture. +# +# Usage: +# ./record-app-screen.sh start [output_name] # Begin recording +# ./record-app-screen.sh stop # Stop and save +# ./record-app-screen.sh status # Check recording state +# +# Outputs to .records/ directory: +# .records/.mp4 — Video assembled from screenshots (~2 fps) +# .records// — Screenshots every SCREENSHOT_INTERVAL seconds +# +# Prerequisites: +# - ffmpeg installed (bun add -g ffmpeg-static, or brew install ffmpeg) +# - agent-browser CLI installed +# - Electron app already running with CDP enabled +# +# Environment variables: +# CDP_PORT — Chrome DevTools Protocol port (default: 9222) +# SCREENSHOT_INTERVAL — Seconds between gallery screenshots (default: 3) +# VIDEO_FRAME_INTERVAL — Seconds between video frames (default: 0.5) +# +# Examples: +# ./electron-dev.sh start +# ./record-app-screen.sh start gateway-demo +# # ... run automation via agent-browser ... +# ./record-app-screen.sh stop +# +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PROJECT_DIR="$(cd "$SCRIPT_DIR/../../../.." && pwd)" + +RECORDS_DIR="$PROJECT_DIR/.records" +PID_FILE="/tmp/record-app-screen.pids" +STATE_FILE="/tmp/record-app-screen.state" + +CDP_PORT="${CDP_PORT:-9222}" +SCREENSHOT_INTERVAL="${SCREENSHOT_INTERVAL:-3}" +VIDEO_FRAME_INTERVAL="${VIDEO_FRAME_INTERVAL:-0.5}" + +AB="agent-browser --cdp $CDP_PORT" + +# ─── Commands ─── + +cmd_start() { + local output_name="${1:-recording-$(date +%Y%m%d-%H%M%S)}" + local output_video="$RECORDS_DIR/${output_name}.mp4" + local screenshot_dir="$RECORDS_DIR/${output_name}" + local frames_dir + frames_dir=$(mktemp -d /tmp/record-frames-XXXXXX) + + if [ -f "$PID_FILE" ]; then + echo "[record] A recording is already active. Run '$0 stop' first." + exit 1 + fi + + mkdir -p "$RECORDS_DIR" "$screenshot_dir" + + # Video frames loop (~2 fps via agent-browser CDP screenshots) + ( + local idx=0 + while true; do + local fname + fname=$(printf "%s/frame_%06d.png" "$frames_dir" "$idx") + $AB screenshot "$fname" 2>/dev/null || true + idx=$((idx + 1)) + sleep "$VIDEO_FRAME_INTERVAL" + done + ) & + local frames_pid=$! + + # Gallery screenshots loop (every N seconds for human review) + ( + local idx=0 + while true; do + local fname + fname=$(printf "%s/%04d.png" "$screenshot_dir" "$idx") + $AB screenshot "$fname" 2>/dev/null || true + idx=$((idx + 1)) + sleep "$SCREENSHOT_INTERVAL" + done + ) & + local screenshot_pid=$! + + # Save state + echo "$frames_pid $screenshot_pid" > "$PID_FILE" + echo "$output_video $frames_dir $screenshot_dir" > "$STATE_FILE" + + echo "[record] Started!" + echo " Video frames: every ${VIDEO_FRAME_INTERVAL}s (PID $frames_pid)" + echo " Screenshots: every ${SCREENSHOT_INTERVAL}s → $screenshot_dir/" + echo " Stop with: $0 stop" +} + +cmd_stop() { + if [ ! -f "$PID_FILE" ] || [ ! -f "$STATE_FILE" ]; then + echo "[record] No active recording found." + return 0 + fi + + local frames_pid screenshot_pid + read -r frames_pid screenshot_pid < "$PID_FILE" + + local output_video frames_dir screenshot_dir + read -r output_video frames_dir screenshot_dir < "$STATE_FILE" + + # Stop both capture loops + kill "$frames_pid" 2>/dev/null || true + kill "$screenshot_pid" 2>/dev/null || true + wait "$frames_pid" 2>/dev/null || true + wait "$screenshot_pid" 2>/dev/null || true + + # Assemble frames into video + local frame_count + frame_count=$(ls -1 "$frames_dir"/frame_*.png 2>/dev/null | wc -l | tr -d ' ') + + if [ "$frame_count" -gt 0 ]; then + echo "[record] Assembling $frame_count frames into video..." + ffmpeg -y -framerate 2 -i "$frames_dir/frame_%06d.png" \ + -c:v libx264 -crf 23 -pix_fmt yuv420p -an \ + "$output_video" > /tmp/ffmpeg-assemble.log 2>&1 + + if [ ! -s "$output_video" ]; then + echo " [warn] Video assembly failed. Check /tmp/ffmpeg-assemble.log" + echo " Frames preserved in: $frames_dir/" + fi + else + echo " [warn] No frames captured." + fi + + rm -rf "$frames_dir" 2>/dev/null + rm -f "$PID_FILE" "$STATE_FILE" + + local video_size screenshot_count + video_size=$(ls -lh "$output_video" 2>/dev/null | awk '{print $5}' || echo "?") + screenshot_count=$(ls -1 "$screenshot_dir"/*.png 2>/dev/null | wc -l | tr -d ' ' || echo "0") + + echo "[record] Stopped!" + echo " Video: $output_video ($video_size)" + echo " Screenshots: ${screenshot_count} files in $screenshot_dir/" + echo " Play: open $output_video" +} + +cmd_status() { + if [ ! -f "$PID_FILE" ]; then + echo "[record] No active recording." + return 0 + fi + + local frames_pid screenshot_pid + read -r frames_pid screenshot_pid < "$PID_FILE" + + local frames_ok="no" screenshot_ok="no" + kill -0 "$frames_pid" 2>/dev/null && frames_ok="yes" + kill -0 "$screenshot_pid" 2>/dev/null && screenshot_ok="yes" + + if [ -f "$STATE_FILE" ]; then + local output_video frames_dir screenshot_dir + read -r output_video frames_dir screenshot_dir < "$STATE_FILE" + local frame_count ss_count + frame_count=$(ls -1 "$frames_dir"/frame_*.png 2>/dev/null | wc -l | tr -d ' ' || echo "0") + ss_count=$(ls -1 "$screenshot_dir"/*.png 2>/dev/null | wc -l | tr -d ' ' || echo "0") + echo "[record] Active recording" + echo " Frames: $frame_count captured (running: $frames_ok)" + echo " Screenshots: $ss_count captured (running: $screenshot_ok)" + echo " Output: $output_video" + fi +} + +# ─── Main ─── + +case "${1:-}" in + start) shift; cmd_start "$@" ;; + stop) cmd_stop ;; + status) cmd_status ;; + *) + echo "Usage: $0 {start [name] | stop | status}" + echo "" + echo " start [name] Start recording (default: recording-YYYYMMDD-HHMMSS)" + echo " stop Stop recording and save outputs" + echo " status Check if recording is active" + exit 1 + ;; +esac diff --git a/.gitignore b/.gitignore index 66d0ac8479..78af482035 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,9 @@ Desktop.ini *.code-workspace .vscode/sessions.json prd +# Recordings +.records/ + # Temporary files .temp/ temp/ diff --git a/packages/types/src/serverConfig.ts b/packages/types/src/serverConfig.ts index 5e2592d018..b6da2a5cc1 100644 --- a/packages/types/src/serverConfig.ts +++ b/packages/types/src/serverConfig.ts @@ -47,6 +47,12 @@ export interface ServerModelProviderConfig { export type ServerLanguageModel = Partial>; export interface GlobalServerConfig { + /** + * Agent Gateway URL for WebSocket-based agent execution. + * When set, the SPA can offload agent execution to the server and receive + * events via the Gateway instead of running the agent loop client-side. + */ + agentGatewayUrl?: string; aiProvider: ServerLanguageModel; defaultAgent?: PartialDeep; disableEmailPassword?: boolean; diff --git a/packages/types/src/user/preference.ts b/packages/types/src/user/preference.ts index 6bc2afde91..04fd4d0105 100644 --- a/packages/types/src/user/preference.ts +++ b/packages/types/src/user/preference.ts @@ -38,6 +38,10 @@ export const UserGuideSchema = z.object({ export type UserGuide = z.infer; export const UserLabSchema = z.object({ + /** + * enable server-side agent execution via Gateway WebSocket + */ + enableGatewayMode: z.boolean().optional(), /** * enable multi-agent group chat mode */ diff --git a/src/envs/app.ts b/src/envs/app.ts index 74b8cf8b13..3645593c78 100644 --- a/src/envs/app.ts +++ b/src/envs/app.ts @@ -76,7 +76,7 @@ export const getAppConfig = () => { MARKET_TRUSTED_CLIENT_ID: z.string().optional(), AGENT_GATEWAY_SERVICE_TOKEN: z.string().optional(), - AGENT_GATEWAY_URL: z.string().url(), + AGENT_GATEWAY_URL: z.string().url().optional(), /** * Enable Queue-based Agent Runtime * When true, use QStash for async agent execution (production) @@ -121,7 +121,7 @@ export const getAppConfig = () => { MARKET_TRUSTED_CLIENT_ID: process.env.MARKET_TRUSTED_CLIENT_ID, AGENT_GATEWAY_SERVICE_TOKEN: process.env.AGENT_GATEWAY_SERVICE_TOKEN, - AGENT_GATEWAY_URL: process.env.AGENT_GATEWAY_URL || 'https://agent-gateway.lobehub.com', + AGENT_GATEWAY_URL: process.env.AGENT_GATEWAY_URL, enableQueueAgentRuntime: process.env.AGENT_RUNTIME_MODE === 'queue', TELEMETRY_DISABLED: process.env.TELEMETRY_DISABLED === '1', }, diff --git a/src/libs/agent-stream/client.ts b/src/libs/agent-stream/client.ts index c1c072388c..9c800f584e 100644 --- a/src/libs/agent-stream/client.ts +++ b/src/libs/agent-stream/client.ts @@ -126,6 +126,7 @@ export class AgentStreamClient extends TypedEmitter { this.intentionalDisconnect = true; this.cleanup(); this.setStatus('disconnected'); + this.emit('disconnected'); } /** @@ -168,6 +169,12 @@ export class AgentStreamClient extends TypedEmitter { } private buildWsUrl(): string { + // If the URL already has a ws/wss protocol, use it directly + if (this.gatewayUrl.startsWith('ws://') || this.gatewayUrl.startsWith('wss://')) { + const base = this.gatewayUrl.replace(/\/+$/, ''); + return `${base}/ws?operationId=${encodeURIComponent(this.operationId)}`; + } + // Otherwise convert http(s) to ws(s) const wsProtocol = this.gatewayUrl.startsWith('https') ? 'wss' : 'ws'; const host = this.gatewayUrl.replace(/^https?:\/\//, ''); return `${wsProtocol}://${host}/ws?operationId=${encodeURIComponent(this.operationId)}`; diff --git a/src/libs/agent-stream/index.ts b/src/libs/agent-stream/index.ts index 90bef21f52..d4529157aa 100644 --- a/src/libs/agent-stream/index.ts +++ b/src/libs/agent-stream/index.ts @@ -5,6 +5,10 @@ export type { AgentStreamEvent, AgentStreamEventType, ConnectionStatus, + StepCompleteData, StreamChunkData, StreamChunkType, + StreamStartData, + ToolEndData, + ToolStartData, } from './types'; diff --git a/src/libs/agent-stream/types.ts b/src/libs/agent-stream/types.ts index 38153d1385..43305d9933 100644 --- a/src/libs/agent-stream/types.ts +++ b/src/libs/agent-stream/types.ts @@ -44,6 +44,33 @@ export interface StreamChunkData { toolsCalling?: any[]; } +// ─── Typed Event Data ─── + +export interface StreamStartData { + assistantMessage: { id: string }; + model?: string; + provider?: string; +} + +export interface ToolStartData { + parentMessageId: string; + toolCalling: Record; +} + +export interface ToolEndData { + executionTime?: number; + isSuccess: boolean; + payload?: Record; + result?: unknown; +} + +export interface StepCompleteData { + finalState?: unknown; + phase: string; + reason?: string; + reasonDetail?: string; +} + // ─── WebSocket Protocol Messages ─── // Client → Server diff --git a/src/locales/default/labs.ts b/src/locales/default/labs.ts index ba80a918ab..e3286b2a97 100644 --- a/src/locales/default/labs.ts +++ b/src/locales/default/labs.ts @@ -2,6 +2,9 @@ export default { 'features.assistantMessageGroup.desc': 'Group agent messages and their tool call results together for display', 'features.assistantMessageGroup.title': 'Agent Message Grouping', + 'features.gatewayMode.desc': + 'Execute agent tasks on the server via Gateway WebSocket instead of running locally. Enables faster execution and reduces client resource usage.', + 'features.gatewayMode.title': 'Server-Side Agent Execution (Gateway)', 'features.groupChat.desc': 'Enable multi-agent group chat coordination.', 'features.groupChat.title': 'Group Chat (Multi-Agent)', 'features.inputMarkdown.desc': diff --git a/src/routes/(main)/settings/advanced/index.tsx b/src/routes/(main)/settings/advanced/index.tsx index 06933996cd..b1a36f6ce8 100644 --- a/src/routes/(main)/settings/advanced/index.tsx +++ b/src/routes/(main)/settings/advanced/index.tsx @@ -13,6 +13,7 @@ import { useTranslation } from 'react-i18next'; import { FORM_STYLE } from '@/const/layoutTokens'; import SettingHeader from '@/routes/(main)/settings/features/SettingHeader'; import { autoUpdateService } from '@/services/electron/autoUpdate'; +import { useServerConfigStore } from '@/store/serverConfig'; import { useUserStore } from '@/store/user'; import { labPreferSelectors, preferenceSelectors, settingsSelectors } from '@/store/user/selectors'; @@ -34,11 +35,16 @@ const Page = memo(() => { const [setSettings, isUserStateInit] = useUserStore((s) => [s.setSettings, s.isUserStateInit]); const [loading, setLoading] = useState(false); - const [isPreferenceInit, enableInputMarkdown, updateLab] = useUserStore((s) => [ - preferenceSelectors.isPreferenceInit(s), - labPreferSelectors.enableInputMarkdown(s), - s.updateLab, - ]); + const [isPreferenceInit, enableInputMarkdown, enableGatewayMode, updateLab] = useUserStore( + (s) => [ + preferenceSelectors.isPreferenceInit(s), + labPreferSelectors.enableInputMarkdown(s), + labPreferSelectors.enableGatewayMode(s), + s.updateLab, + ], + ); + + const hasGatewayUrl = useServerConfigStore((s) => !!s.serverConfig.agentGatewayUrl); const [channel, setChannel] = useState('stable'); @@ -112,6 +118,23 @@ const Page = memo(() => { label: tLabs('features.inputMarkdown.title'), minWidth: undefined, }, + ...(hasGatewayUrl + ? [ + { + children: ( + updateLab({ enableGatewayMode: checked })} + /> + ), + className: styles.labItem, + desc: tLabs('features.gatewayMode.desc'), + label: tLabs('features.gatewayMode.title'), + minWidth: undefined, + }, + ] + : []), ], title: tLabs('title'), }; diff --git a/src/server/globalConfig/index.ts b/src/server/globalConfig/index.ts index ae6c2c553c..65ce017a76 100644 --- a/src/server/globalConfig/index.ts +++ b/src/server/globalConfig/index.ts @@ -85,6 +85,11 @@ export const getServerGlobalConfig = async () => { ), enableUploadFileToServer: !!fileEnv.S3_SECRET_ACCESS_KEY, + // Expose Agent Gateway URL to client when queue-based agent runtime is enabled + ...(appEnv.enableQueueAgentRuntime && appEnv.AGENT_GATEWAY_URL + ? { agentGatewayUrl: appEnv.AGENT_GATEWAY_URL } + : undefined), + image: cleanObject({ defaultImageNum: imageEnv.AI_IMAGE_DEFAULT_IMAGE_NUM, }), diff --git a/src/server/modules/AgentRuntime/factory.ts b/src/server/modules/AgentRuntime/factory.ts index 4d72c504ac..79666418cc 100644 --- a/src/server/modules/AgentRuntime/factory.ts +++ b/src/server/modules/AgentRuntime/factory.ts @@ -72,7 +72,7 @@ export const createStreamEventManager = (): IStreamEventManager => { } // Wrap with Gateway notifier when configured - if (appEnv.AGENT_GATEWAY_SERVICE_TOKEN) { + if (appEnv.AGENT_GATEWAY_URL && appEnv.AGENT_GATEWAY_SERVICE_TOKEN) { log('Wrapping with GatewayStreamNotifier (%s)', appEnv.AGENT_GATEWAY_URL); return new GatewayStreamNotifier( manager, diff --git a/src/services/aiAgent.ts b/src/services/aiAgent.ts index 7f838ac57b..da1c379aa7 100644 --- a/src/services/aiAgent.ts +++ b/src/services/aiAgent.ts @@ -1,5 +1,9 @@ +import type { ExecAgentResult } from '@lobechat/types'; + import { lambdaClient } from '@/libs/trpc/client'; +export type { ExecAgentResult }; + export interface ExecAgentTaskParams { agentId?: string; appContext?: { @@ -91,9 +95,10 @@ export interface UpdateClientTaskThreadStatusParams { class AiAgentService { /** - * Execute a single Agent task + * Execute a single Agent task. + * Returns the operationId needed to connect to the Agent Gateway. */ - async execAgentTask(params: ExecAgentTaskParams) { + async execAgentTask(params: ExecAgentTaskParams): Promise { return await lambdaClient.aiAgent.execAgent.mutate(params); } diff --git a/src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts b/src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts new file mode 100644 index 0000000000..caa9d81496 --- /dev/null +++ b/src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts @@ -0,0 +1,239 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import type { AgentStreamEvent } from '@/libs/agent-stream'; + +import type { GatewayConnection } from '../gateway'; +import { GatewayActionImpl } from '../gateway'; + +// ─── Mock Client Factory ─── + +function createMockClient(): GatewayConnection['client'] & { + emitEvent: (event: string, ...args: any[]) => void; +} { + const listeners = new Map void>>(); + + return { + connect: vi.fn(), + disconnect: vi.fn(), + emitEvent(event: string, ...args: any[]) { + listeners.get(event)?.forEach((listener) => listener(...args)); + }, + on: vi.fn((event: string, listener: (...args: any[]) => void) => { + let set = listeners.get(event); + if (!set) { + set = new Set(); + listeners.set(event, set); + } + set.add(listener); + }), + sendInterrupt: vi.fn(), + }; +} + +// ─── Test Helpers ─── + +function createTestAction() { + const state: Record = { gatewayConnections: {} }; + const set = vi.fn((updater: any) => { + if (typeof updater === 'function') { + Object.assign(state, updater(state)); + } else { + Object.assign(state, updater); + } + }); + const get = vi.fn(() => state as any); + + const action = new GatewayActionImpl(set as any, get, undefined); + + // Inject mock client factory + const mockClient = createMockClient(); + action.createClient = vi.fn(() => mockClient); + + return { action, get, mockClient, set, state }; +} + +describe('GatewayActionImpl', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe('connectToGateway', () => { + it('should create client and add to store', () => { + const { action, mockClient, state } = createTestAction(); + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'test-token', + }); + + expect(state.gatewayConnections['op-1']).toBeDefined(); + expect(state.gatewayConnections['op-1'].status).toBe('connecting'); + expect(mockClient.connect).toHaveBeenCalledOnce(); + }); + + it('should wire up status_changed listener', () => { + const { action, mockClient, state } = createTestAction(); + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'test-token', + }); + + mockClient.emitEvent('status_changed', 'connected'); + expect(state.gatewayConnections['op-1'].status).toBe('connected'); + }); + + it('should forward agent events to onEvent callback', () => { + const { action, mockClient } = createTestAction(); + const events: AgentStreamEvent[] = []; + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + onEvent: (e) => events.push(e), + operationId: 'op-1', + token: 'test-token', + }); + + const testEvent: AgentStreamEvent = { + data: { content: 'hello' }, + operationId: 'op-1', + stepIndex: 0, + timestamp: Date.now(), + type: 'stream_chunk', + }; + mockClient.emitEvent('agent_event', testEvent); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual(testEvent); + }); + + it('should cleanup on session_complete', () => { + const { action, mockClient, state } = createTestAction(); + const onComplete = vi.fn(); + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + onSessionComplete: onComplete, + operationId: 'op-1', + token: 'test-token', + }); + + mockClient.emitEvent('session_complete'); + expect(state.gatewayConnections['op-1']).toBeUndefined(); + expect(onComplete).toHaveBeenCalledOnce(); + }); + + it('should cleanup on disconnected', () => { + const { action, mockClient, state } = createTestAction(); + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'test-token', + }); + + mockClient.emitEvent('disconnected'); + expect(state.gatewayConnections['op-1']).toBeUndefined(); + }); + + it('should cleanup on auth_failed', () => { + const { action, mockClient, state } = createTestAction(); + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'test-token', + }); + + mockClient.emitEvent('auth_failed', 'invalid token'); + expect(state.gatewayConnections['op-1']).toBeUndefined(); + }); + + it('should disconnect existing connection before creating new one', () => { + const { action, state } = createTestAction(); + + // First connection with its own mock + const firstMock = createMockClient(); + action.createClient = vi.fn(() => firstMock); + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'token-1', + }); + + // Second connection + const secondMock = createMockClient(); + action.createClient = vi.fn(() => secondMock); + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'token-2', + }); + + expect(firstMock.disconnect).toHaveBeenCalled(); + expect(state.gatewayConnections['op-1'].client).toBe(secondMock); + }); + }); + + describe('disconnectFromGateway', () => { + it('should disconnect and cleanup', () => { + const { action, mockClient, state } = createTestAction(); + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'test-token', + }); + + action.disconnectFromGateway('op-1'); + expect(mockClient.disconnect).toHaveBeenCalled(); + expect(state.gatewayConnections['op-1']).toBeUndefined(); + }); + + it('should be a no-op for unknown operationId', () => { + const { action } = createTestAction(); + action.disconnectFromGateway('nonexistent'); + }); + }); + + describe('interruptGatewayAgent', () => { + it('should send interrupt to the client', () => { + const { action, mockClient } = createTestAction(); + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'test-token', + }); + + action.interruptGatewayAgent('op-1'); + expect(mockClient.sendInterrupt).toHaveBeenCalledOnce(); + }); + + it('should be a no-op for unknown operationId', () => { + const { action } = createTestAction(); + action.interruptGatewayAgent('nonexistent'); + }); + }); + + describe('getGatewayConnectionStatus', () => { + it('should return status for active connection', () => { + const { action } = createTestAction(); + + action.connectToGateway({ + gatewayUrl: 'https://gateway.test.com', + operationId: 'op-1', + token: 'test-token', + }); + + expect(action.getGatewayConnectionStatus('op-1')).toBe('connecting'); + }); + + it('should return undefined for unknown operationId', () => { + const { action } = createTestAction(); + expect(action.getGatewayConnectionStatus('nonexistent')).toBeUndefined(); + }); + }); +}); diff --git a/src/store/chat/slices/aiChat/actions/__tests__/gatewayEventHandler.test.ts b/src/store/chat/slices/aiChat/actions/__tests__/gatewayEventHandler.test.ts new file mode 100644 index 0000000000..be1598a7ce --- /dev/null +++ b/src/store/chat/slices/aiChat/actions/__tests__/gatewayEventHandler.test.ts @@ -0,0 +1,367 @@ +import { describe, expect, it, vi } from 'vitest'; + +import type { AgentStreamEvent } from '@/libs/agent-stream'; + +import { createGatewayEventHandler } from '../gatewayEventHandler'; + +vi.mock('@/services/message', () => ({ + messageService: { getMessages: vi.fn().mockResolvedValue([]) }, +})); + +// ─── Test Helpers ─── + +function createMockStore() { + return { + associateMessageWithOperation: vi.fn(), + completeOperation: vi.fn(), + internal_dispatchMessage: vi.fn(), + internal_toggleToolCallingStreaming: vi.fn(), + replaceMessages: vi.fn(), + }; +} + +function createHandler( + store: ReturnType, + overrides?: { assistantMessageId?: string }, +) { + const get = vi.fn(() => store) as any; + return createGatewayEventHandler(get, { + assistantMessageId: overrides?.assistantMessageId ?? 'msg-initial', + context: { agentId: 'agent-1', scope: 'session', topicId: 'topic-1' } as any, + operationId: 'op-1', + }); +} + +function makeEvent(type: AgentStreamEvent['type'], data?: any): AgentStreamEvent { + return { data, id: '1', operationId: 'op-1', stepIndex: 0, timestamp: Date.now(), type }; +} + +/** Flush the async processing queue by draining microtasks + setTimeout queue */ +const flush = async () => { + for (let i = 0; i < 5; i++) { + await new Promise((r) => setTimeout(r, 15)); + } +}; + +// ─── Tests ─── + +describe('createGatewayEventHandler', () => { + describe('stream_start', () => { + it('should associate new message with operation', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-step2' } })); + await flush(); + + expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-step2', 'op-1'); + expect(store.replaceMessages).toHaveBeenCalled(); + }); + + it('should keep current ID if event data has no assistantMessage', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('stream_start', {})); + await flush(); + + // No new message to associate, but fetch still happens + expect(store.associateMessageWithOperation).not.toHaveBeenCalled(); + expect(store.replaceMessages).toHaveBeenCalled(); + }); + + it('should reset accumulators on each stream_start', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + // Accumulate some content + handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'hello' })); + await flush(); + expect(store.internal_dispatchMessage).toHaveBeenCalledWith( + expect.objectContaining({ value: { content: 'hello' } }), + { operationId: 'op-1' }, + ); + + // New stream_start resets + handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-step2' } })); + handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'world' })); + await flush(); + + // Content should be 'world', not 'helloworld' + expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith( + expect.objectContaining({ + id: 'msg-step2', + value: { content: 'world' }, + }), + { operationId: 'op-1' }, + ); + }); + }); + + describe('stream_chunk', () => { + it('should accumulate text content and pass operationId context', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Hello' })); + handler(makeEvent('stream_chunk', { chunkType: 'text', content: ' world' })); + await flush(); + + expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith( + { + id: 'msg-initial', + type: 'updateMessage', + value: { content: 'Hello world' }, + }, + { operationId: 'op-1' }, + ); + }); + + it('should accumulate reasoning content', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('stream_chunk', { chunkType: 'reasoning', reasoning: 'Think' })); + handler(makeEvent('stream_chunk', { chunkType: 'reasoning', reasoning: 'ing...' })); + await flush(); + + expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith( + { + id: 'msg-initial', + type: 'updateMessage', + value: { reasoning: { content: 'Thinking...' } }, + }, + { operationId: 'op-1' }, + ); + }); + + it('should dispatch tools and toggle tool calling streaming', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + const toolsCalling = [{ id: 'tc-1' }, { id: 'tc-2' }]; + handler(makeEvent('stream_chunk', { chunkType: 'tools_calling', toolsCalling })); + await flush(); + + expect(store.internal_dispatchMessage).toHaveBeenCalledWith( + { + id: 'msg-initial', + type: 'updateMessage', + value: { tools: toolsCalling }, + }, + { operationId: 'op-1' }, + ); + + expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith('msg-initial', [ + true, + true, + ]); + }); + + it('should ignore chunk with no data', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('stream_chunk', undefined)); + await flush(); + + expect(store.internal_dispatchMessage).not.toHaveBeenCalled(); + }); + }); + + describe('stream_end', () => { + it('should clear tool streaming only', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('stream_end')); + await flush(); + + expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith( + 'msg-initial', + undefined, + ); + }); + }); + + describe('tool_start', () => { + it('should be a no-op (loading already active from stream_start)', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('tool_start', { parentMessageId: 'msg-initial', toolCalling: {} })); + await flush(); + + expect(store.internal_dispatchMessage).not.toHaveBeenCalled(); + expect(store.replaceMessages).not.toHaveBeenCalled(); + }); + }); + + describe('tool_end', () => { + it('should refresh messages to pull tool results', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('tool_end', { isSuccess: true })); + await flush(); + + expect(store.replaceMessages).toHaveBeenCalled(); + }); + }); + + describe('step_complete', () => { + it('should refresh on execution_complete phase', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('step_complete', { phase: 'execution_complete', reason: 'done' })); + await flush(); + + expect(store.replaceMessages).toHaveBeenCalled(); + }); + + it('should not refresh on other phases', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('step_complete', { phase: 'human_approval' })); + await flush(); + + expect(store.replaceMessages).not.toHaveBeenCalled(); + }); + }); + + describe('agent_runtime_end', () => { + it('should complete operation and refresh messages', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('agent_runtime_end')); + await flush(); + + expect(store.completeOperation).toHaveBeenCalledWith('op-1'); + expect(store.replaceMessages).toHaveBeenCalled(); + }); + }); + + describe('error', () => { + it('should dispatch error to current message with operationId context', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('error', { message: 'Something went wrong' })); + await flush(); + + expect(store.internal_dispatchMessage).toHaveBeenCalledWith( + { + id: 'msg-initial', + type: 'updateMessage', + value: { + error: { body: { message: 'Something went wrong' }, type: 'AgentRuntimeError' }, + }, + }, + { operationId: 'op-1' }, + ); + }); + + it('should dispatch error to switched message ID', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-step2' } })); + handler(makeEvent('error', { error: 'Timeout' })); + await flush(); + + expect(store.internal_dispatchMessage).toHaveBeenCalledWith( + expect.objectContaining({ id: 'msg-step2' }), + { operationId: 'op-1' }, + ); + }); + }); + + describe('sequential processing', () => { + it('should process stream_chunk only after stream_start refresh completes', async () => { + const store = createMockStore(); + const callOrder: string[] = []; + + const { messageService } = await import('@/services/message'); + (messageService.getMessages as any).mockImplementation(async () => { + callOrder.push('refresh_start'); + await new Promise((r) => setTimeout(r, 10)); + callOrder.push('refresh_end'); + return []; + }); + store.internal_dispatchMessage.mockImplementation(() => { + callOrder.push('dispatch'); + }); + store.associateMessageWithOperation.mockImplementation(() => { + callOrder.push('associate'); + }); + + const handler = createHandler(store); + + handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-new' } })); + handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Hello' })); + await flush(); + + const refreshEndIdx = callOrder.indexOf('refresh_end'); + const dispatchIdx = callOrder.indexOf('dispatch'); + expect(refreshEndIdx).toBeGreaterThan(-1); + expect(dispatchIdx).toBeGreaterThan(refreshEndIdx); + }); + }); + + describe('multi-step integration', () => { + it('should handle full LLM → tools → LLM cycle', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + // Step 1: LLM call + handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-1' } })); + await flush(); + expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-1', 'op-1'); + + handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Let me search.' })); + await flush(); + expect(store.internal_dispatchMessage).toHaveBeenCalledWith( + expect.objectContaining({ id: 'msg-1', value: { content: 'Let me search.' } }), + { operationId: 'op-1' }, + ); + + const tools = [{ id: 'tc-1' }]; + handler(makeEvent('stream_chunk', { chunkType: 'tools_calling', toolsCalling: tools })); + await flush(); + expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith('msg-1', [true]); + + handler(makeEvent('stream_end')); + await flush(); + // Loading stays active between steps — only tool streaming is cleared + expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith('msg-1', undefined); + + // Tool execution + handler(makeEvent('tool_start', { parentMessageId: 'msg-1', toolCalling: tools[0] })); + handler(makeEvent('tool_end', { isSuccess: true })); + await flush(); + expect(store.replaceMessages).toHaveBeenCalled(); + + // Step 2: Next LLM call with new assistant message + vi.clearAllMocks(); + handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-2' } })); + await flush(); + expect(store.replaceMessages).toHaveBeenCalled(); + expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-2', 'op-1'); + + handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Here are the results.' })); + await flush(); + expect(store.internal_dispatchMessage).toHaveBeenCalledWith( + expect.objectContaining({ id: 'msg-2', value: { content: 'Here are the results.' } }), + { operationId: 'op-1' }, + ); + + handler(makeEvent('stream_end')); + handler(makeEvent('agent_runtime_end')); + await flush(); + expect(store.completeOperation).toHaveBeenCalledWith('op-1'); + }); + }); +}); diff --git a/src/store/chat/slices/aiChat/actions/conversationLifecycle.ts b/src/store/chat/slices/aiChat/actions/conversationLifecycle.ts index 92ec3e249a..925f83b4f5 100644 --- a/src/store/chat/slices/aiChat/actions/conversationLifecycle.ts +++ b/src/store/chat/slices/aiChat/actions/conversationLifecycle.ts @@ -48,7 +48,6 @@ import { parseSelectedToolsFromEditorData, processCommands, } from './commandBus'; - /** * Extended params for sendMessage with context */ @@ -584,6 +583,31 @@ export class ConversationLifecycleActionImpl { } // ── AI execution ── + + // Gateway mode: server-side execution via WebSocket (opt-in via Labs toggle) + if (this.#get().isGatewayModeEnabled()) { + try { + await this.#get().executeGatewayAgent({ + assistantMessageId: data.assistantMessageId, + context: execContext, + message, + parentOperationId: operationId, + topicId: data.topicId, + userMessageId: data.userMessageId, + }); + } catch (e) { + console.error('[Gateway] Failed to start server-side agent:', e); + if (data.topicId) this.#get().internal_updateTopicLoading(data.topicId, false); + } + + return { + assistantMessageId: data.assistantMessageId, + createdThreadId: data.createdThreadId, + userMessageId: data.userMessageId, + }; + } + + // Client mode: run agent loop locally { const displayMessages = displayMessageSelectors.getDisplayMessagesByKey( messageMapKey(execContext), diff --git a/src/store/chat/slices/aiChat/actions/gateway.ts b/src/store/chat/slices/aiChat/actions/gateway.ts new file mode 100644 index 0000000000..55d2b6d8c2 --- /dev/null +++ b/src/store/chat/slices/aiChat/actions/gateway.ts @@ -0,0 +1,239 @@ +import type { ConversationContext } from '@lobechat/types'; + +import type { + AgentStreamClientOptions, + AgentStreamEvent, + ConnectionStatus, +} from '@/libs/agent-stream'; +import { AgentStreamClient } from '@/libs/agent-stream/client'; +import { aiAgentService } from '@/services/aiAgent'; +import type { ChatStore } from '@/store/chat/store'; +import type { StoreSetter } from '@/store/types'; +import { useUserStore } from '@/store/user'; + +import { createGatewayEventHandler } from './gatewayEventHandler'; + +type Setter = StoreSetter; + +// ─── Types ─── + +export interface GatewayConnection { + client: Pick; + status: ConnectionStatus; +} + +export interface ConnectGatewayParams { + /** + * Gateway WebSocket URL (e.g. https://agent-gateway.lobehub.com) + */ + gatewayUrl: string; + /** + * Callback for each agent event received + */ + onEvent?: (event: AgentStreamEvent) => void; + /** + * Called when the session completes (agent_runtime_end or session_complete) + */ + onSessionComplete?: () => void; + /** + * The operation ID returned by execAgent + */ + operationId: string; + /** + * Auth token for the Gateway + */ + token: string; +} + +// ─── Action Implementation ─── + +export class GatewayActionImpl { + readonly #get: () => ChatStore; + readonly #set: Setter; + + /** Overridable factory for testing */ + createClient: (options: AgentStreamClientOptions) => GatewayConnection['client'] = (options) => + new AgentStreamClient(options); + + constructor(set: Setter, get: () => ChatStore, _api?: unknown) { + void _api; + this.#set = set; + this.#get = get; + } + + /** + * Connect to the Agent Gateway for a specific operation. + * Creates an AgentStreamClient, manages its lifecycle, and wires up event callbacks. + */ + connectToGateway = (params: ConnectGatewayParams): void => { + const { operationId, gatewayUrl, token, onEvent, onSessionComplete } = params; + + // Disconnect existing connection for this operation if any + this.disconnectFromGateway(operationId); + + const client = this.createClient({ gatewayUrl, operationId, token }); + + // Track connection in store + this.#set( + (state) => ({ + gatewayConnections: { + ...state.gatewayConnections, + [operationId]: { client, status: 'connecting' }, + }, + }), + false, + 'connectToGateway', + ); + + // Wire up status changes + client.on('status_changed', (status) => { + this.#set( + (state) => { + const conn = state.gatewayConnections[operationId]; + if (!conn) return state; + return { + gatewayConnections: { ...state.gatewayConnections, [operationId]: { ...conn, status } }, + }; + }, + false, + 'gateway/statusChanged', + ); + }); + + // Forward agent events to caller + if (onEvent) { + client.on('agent_event', onEvent); + } + + // Handle session completion + client.on('session_complete', () => { + this.internal_cleanupGatewayConnection(operationId); + onSessionComplete?.(); + }); + + // Handle disconnection (terminal events auto-disconnect the client) + client.on('disconnected', () => { + this.internal_cleanupGatewayConnection(operationId); + }); + + // Handle auth failures + client.on('auth_failed', (reason) => { + console.error(`[Gateway] Auth failed for operation ${operationId}: ${reason}`); + this.internal_cleanupGatewayConnection(operationId); + }); + + client.connect(); + }; + + /** + * Disconnect from the Gateway for a specific operation. + */ + disconnectFromGateway = (operationId: string): void => { + const conn = this.#get().gatewayConnections[operationId]; + if (!conn) return; + + conn.client.disconnect(); + this.internal_cleanupGatewayConnection(operationId); + }; + + /** + * Send an interrupt command to stop the agent for a specific operation. + */ + interruptGatewayAgent = (operationId: string): void => { + const conn = this.#get().gatewayConnections[operationId]; + if (!conn) return; + + conn.client.sendInterrupt(); + }; + + /** + * Get the connection status for a specific operation. + */ + getGatewayConnectionStatus = (operationId: string): ConnectionStatus | undefined => { + return this.#get().gatewayConnections[operationId]?.status; + }; + + /** + * Check if Gateway mode is available and enabled. + * Returns true if both server config and user lab toggle are set. + */ + isGatewayModeEnabled = (): boolean => { + const agentGatewayUrl = + window.global_serverConfigStore?.getState()?.serverConfig?.agentGatewayUrl; + const enableGatewayMode = useUserStore.getState().preference.lab?.enableGatewayMode; + + return !!agentGatewayUrl && !!enableGatewayMode; + }; + + /** + * Execute agent task via Gateway WebSocket. + * Call isGatewayModeEnabled() first to check availability. + */ + executeGatewayAgent = async (params: { + assistantMessageId: string; + context: ConversationContext; + message: string; + parentOperationId: string; + topicId?: string; + userMessageId: string; + }): Promise => { + const { assistantMessageId, context, message, parentOperationId, topicId, userMessageId } = + params; + + const agentGatewayUrl = + window.global_serverConfigStore!.getState().serverConfig.agentGatewayUrl!; + + const result = await aiAgentService.execAgentTask({ + agentId: context.agentId, + appContext: { + groupId: context.groupId, + scope: context.scope, + threadId: context.threadId, + topicId: context.topicId, + }, + existingMessageIds: [userMessageId, assistantMessageId], + prompt: message, + }); + + // Create a dedicated operation for gateway execution with correct context + const { operationId: gatewayOpId } = this.#get().startOperation({ + context, + parentOperationId, + type: 'execServerAgentRuntime', + }); + + // Associate the initial assistant message with the gateway operation + // so the UI shows loading/generating state via the operation system + this.#get().associateMessageWithOperation(assistantMessageId, gatewayOpId); + + const eventHandler = createGatewayEventHandler(this.#get, { + assistantMessageId, + context, + operationId: gatewayOpId, + }); + + this.#get().connectToGateway({ + gatewayUrl: agentGatewayUrl, + onEvent: eventHandler, + onSessionComplete: () => { + this.#get().completeOperation(gatewayOpId); + if (topicId) this.#get().internal_updateTopicLoading(topicId, false); + }, + operationId: result.operationId, + token: result.token || '', + }); + }; + + private internal_cleanupGatewayConnection = (operationId: string): void => { + this.#set( + (state) => { + const { [operationId]: _, ...rest } = state.gatewayConnections; + return { gatewayConnections: rest }; + }, + false, + 'gateway/cleanup', + ); + }; +} + +export type GatewayAction = Pick; diff --git a/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts b/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts new file mode 100644 index 0000000000..c7e23518d1 --- /dev/null +++ b/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts @@ -0,0 +1,198 @@ +import type { ConversationContext } from '@lobechat/types'; + +import type { + AgentStreamEvent, + StepCompleteData, + StreamChunkData, + StreamStartData, +} from '@/libs/agent-stream'; +import { messageService } from '@/services/message'; +import type { ChatStore } from '@/store/chat/store'; + +/** + * Fetch messages from DB and replace them in the chat store's dbMessagesMap. + * This updates the ConversationArea component via React subscription: + * dbMessagesMap → ConversationArea (messages prop) → ConversationStore → UI + */ +const fetchAndReplaceMessages = async (get: () => ChatStore, context: ConversationContext) => { + const messages = await messageService.getMessages(context); + get().replaceMessages(messages, { context }); +}; + +/** + * Creates a handler function that processes Agent Gateway events + * and maps them to the chat store's message update actions. + * + * Supports multi-step agent execution (LLM → tool calls → next LLM → ...) + * using a hybrid approach: + * - Current LLM step: real-time streaming via stream_chunk + * - Step transitions: fetchAndReplaceMessages from DB at stream_start / tool_end / step_complete + * + * The handler queues incoming events and processes them sequentially, + * ensuring that stream_chunk waits for stream_start's DB fetch to resolve + * before dispatching updates. + */ +export const createGatewayEventHandler = ( + get: () => ChatStore, + params: { + assistantMessageId: string; + context: ConversationContext; + operationId: string; + }, +) => { + const { context, operationId } = params; + + // Dispatch context — ensures internal_dispatchMessage resolves the correct messageMapKey + const dispatchContext = { operationId }; + + // Mutable — switches to new assistant message ID on each stream_start + let currentAssistantMessageId = params.assistantMessageId; + + // Accumulated content from stream chunks (reset on each stream_start) + let accumulatedContent = ''; + let accumulatedReasoning = ''; + + // Sequential processing queue — ensures stream_chunk waits for stream_start's fetch + let processingChain: Promise = Promise.resolve(); + + const enqueue = (fn: () => Promise | void): void => { + processingChain = processingChain.then(fn, fn); + }; + + return (event: AgentStreamEvent) => { + switch (event.type) { + case 'stream_start': { + enqueue(async () => { + const data = event.data as StreamStartData | undefined; + + const newAssistantMessageId = data?.assistantMessage?.id; + + // Switch to the new assistant message created by the server for this step + if (newAssistantMessageId) { + currentAssistantMessageId = newAssistantMessageId; + // Associate the new message with the operation so UI shows generating state + get().associateMessageWithOperation(currentAssistantMessageId, operationId); + } + + // Reset accumulators for the new stream + accumulatedContent = ''; + accumulatedReasoning = ''; + + // Fetch from DB so the new message exists in dbMessagesMap before chunks arrive + await fetchAndReplaceMessages(get, context).catch(console.error); + }); + break; + } + + case 'stream_chunk': { + enqueue(() => { + const data = event.data as StreamChunkData | undefined; + if (!data) return; + + if (data.chunkType === 'text' && data.content) { + accumulatedContent += data.content; + get().internal_dispatchMessage( + { + id: currentAssistantMessageId, + type: 'updateMessage', + value: { content: accumulatedContent }, + }, + dispatchContext, + ); + } + + if (data.chunkType === 'reasoning' && data.reasoning) { + accumulatedReasoning += data.reasoning; + get().internal_dispatchMessage( + { + id: currentAssistantMessageId, + type: 'updateMessage', + value: { reasoning: { content: accumulatedReasoning } }, + }, + dispatchContext, + ); + } + + if (data.chunkType === 'tools_calling' && data.toolsCalling) { + get().internal_dispatchMessage( + { + id: currentAssistantMessageId, + type: 'updateMessage', + value: { tools: data.toolsCalling }, + }, + dispatchContext, + ); + + // Drive tool calling animation + get().internal_toggleToolCallingStreaming( + currentAssistantMessageId, + data.toolsCalling.map(() => true), + ); + } + }); + break; + } + + case 'stream_end': { + enqueue(() => { + // Only clear tool calling streaming — keep message loading active + // until agent_runtime_end so users don't think the session ended + // during tool execution gaps between steps + get().internal_toggleToolCallingStreaming(currentAssistantMessageId, undefined); + }); + break; + } + + case 'tool_start': { + // Server creates tool messages in DB. + // Loading is already active from stream_start (not cleared by stream_end). + break; + } + + case 'tool_end': { + enqueue(async () => { + await fetchAndReplaceMessages(get, context).catch(console.error); + }); + break; + } + + case 'step_complete': { + const data = event.data as StepCompleteData | undefined; + + // Refresh on execution_complete to ensure final step state is consistent + if (data?.phase === 'execution_complete') { + enqueue(async () => { + await fetchAndReplaceMessages(get, context).catch(console.error); + }); + } + break; + } + + case 'agent_runtime_end': { + enqueue(async () => { + get().internal_toggleToolCallingStreaming(currentAssistantMessageId, undefined); + get().completeOperation(operationId); + await fetchAndReplaceMessages(get, context).catch(console.error); + }); + break; + } + + case 'error': { + enqueue(() => { + const errorMsg = event.data?.message || event.data?.error || 'Unknown error'; + get().internal_dispatchMessage( + { + id: currentAssistantMessageId, + type: 'updateMessage', + value: { + error: { body: { message: errorMsg }, type: 'AgentRuntimeError' }, + }, + }, + dispatchContext, + ); + }); + break; + } + } + }; +}; diff --git a/src/store/chat/slices/aiChat/actions/index.ts b/src/store/chat/slices/aiChat/actions/index.ts index 664ee43843..1729bed546 100644 --- a/src/store/chat/slices/aiChat/actions/index.ts +++ b/src/store/chat/slices/aiChat/actions/index.ts @@ -7,6 +7,8 @@ import { type ConversationControlAction } from './conversationControl'; import { ConversationControlActionImpl } from './conversationControl'; import { type ConversationLifecycleAction } from './conversationLifecycle'; import { ConversationLifecycleActionImpl } from './conversationLifecycle'; +import { type GatewayAction } from './gateway'; +import { GatewayActionImpl } from './gateway'; import { type ChatMemoryAction } from './memory'; import { ChatMemoryActionImpl } from './memory'; import { type StreamingExecutorAction } from './streamingExecutor'; @@ -17,6 +19,7 @@ import { StreamingStatesActionImpl } from './streamingStates'; export type ChatAIChatAction = ChatMemoryAction & ConversationLifecycleAction & ConversationControlAction & + GatewayAction & StreamingExecutorAction & StreamingStatesAction; @@ -34,6 +37,7 @@ export const chatAiChat: StateCreator< new ChatMemoryActionImpl(...params), new ConversationLifecycleActionImpl(...params), new ConversationControlActionImpl(...params), + new GatewayActionImpl(...params), new StreamingExecutorActionImpl(...params), new StreamingStatesActionImpl(...params), ]); diff --git a/src/store/chat/slices/aiChat/initialState.ts b/src/store/chat/slices/aiChat/initialState.ts index c0adb1116f..25e2ab814c 100644 --- a/src/store/chat/slices/aiChat/initialState.ts +++ b/src/store/chat/slices/aiChat/initialState.ts @@ -1,6 +1,11 @@ import { type ChatInputEditor } from '@/features/ChatInput'; +import type { GatewayConnection } from '@/store/chat/slices/aiChat/actions/gateway'; export interface ChatAIChatState { + /** + * Active Agent Gateway WebSocket connections, keyed by operationId + */ + gatewayConnections: Record; inputFiles: File[]; inputMessage: string; mainInputEditor: ChatInputEditor | null; @@ -13,6 +18,7 @@ export interface ChatAIChatState { } export const initialAiChatState: ChatAIChatState = { + gatewayConnections: {}, inputFiles: [], inputMessage: '', mainInputEditor: null, diff --git a/src/store/user/slices/preference/selectors/labPrefer.ts b/src/store/user/slices/preference/selectors/labPrefer.ts index 2d870b316c..5235e3e2db 100644 --- a/src/store/user/slices/preference/selectors/labPrefer.ts +++ b/src/store/user/slices/preference/selectors/labPrefer.ts @@ -3,6 +3,7 @@ import { DEFAULT_PREFERENCE } from '@lobechat/const'; import { type UserState } from '@/store/user/initialState'; export const labPreferSelectors = { + enableGatewayMode: (s: UserState): boolean => s.preference.lab?.enableGatewayMode ?? false, enableInputMarkdown: (s: UserState): boolean => s.preference.lab?.enableInputMarkdown ?? DEFAULT_PREFERENCE.lab!.enableInputMarkdown!, };