diff --git a/.agents/skills/bot/SKILL.md b/.agents/skills/bot/SKILL.md index 164ee12c7c..63045f685c 100644 --- a/.agents/skills/bot/SKILL.md +++ b/.agents/skills/bot/SKILL.md @@ -76,7 +76,7 @@ The router caches loaded bots in memory. Cache is **invalidated** by `BotMessage 2. Calls `execAgent` with `stepWebhook` and `completionWebhook` pointing at `${INTERNAL_APP_URL ?? APP_URL}/api/agent/webhooks/bot-callback`, plus `webhookDelivery: 'qstash'`. 3. Returns immediately; the bridge `finally` block keeps the active-thread marker held until the `completion` callback fires. -`/api/agent/webhooks/bot-callback/route.ts` verifies the QStash signature and hands off to `BotCallbackService.handleCallback`: +`POST /api/agent/webhooks/bot-callback` (`src/server/agent-hono/handlers/botCallback.ts`) verifies the QStash signature via the `qstashAuth` middleware and hands off to `BotCallbackService.handleCallback`: - `type: 'step'` → `handleStep` re-renders `renderStepProgress`, edits `progressMessageId` (skipped if `displayToolCalls=false` or platform `supportsMessageEdit=false`). - `type: 'completion'` → `handleCompletion` writes the final reply (or error/interrupted message), removes the 👀 reaction, clears active-thread tracker, fires async `summarizeTopicTitle`. @@ -140,12 +140,12 @@ Webhook platforms run fine in serverless functions. Persistent platforms (`webso - On Vercel + webhook mode → start the client inline (one HTTP call). - Off-Vercel → `GatewayManager` singleton holds long-lived clients in process. -**`GET /api/agent/gateway/route.ts`** (cron, `Bearer ${CRON_SECRET}`): +**`GET /api/agent/gateway`** (`src/server/agent-hono/handlers/gatewayCron.ts`, cron, `Bearer ${CRON_SECRET}`): - Iterates registered platforms and starts every enabled persistent provider with `durationMs = 10min`, then in `after(...)` polls `BotConnectQueue` every 30s for new connect requests, until the window expires. - `getEffectiveConnectionMode(platform, settings)` is the only place that resolves per-provider mode — respect it everywhere. -**`POST /api/agent/gateway/start/route.ts`** is the non-Vercel `ensureRunning` entry point (`Bearer ${KEY_VAULTS_SECRET}`). +**`POST /api/agent/gateway/start`** (`src/server/agent-hono/handlers/gatewayStart.ts`) is the non-Vercel `ensureRunning` entry point (`Bearer ${KEY_VAULTS_SECRET}`). **Runtime status** is stored in Redis at `bot:runtime-status:platform:appId` with TTL ≈ `durationMs + 60s`. States: `starting | connected | disconnected | failed | queued`. Updated by each `PlatformClient.start/stop` and by the gateway service. @@ -226,11 +226,11 @@ Client service: `src/services/agentBotProvider.ts`. Store actions: `src/store/ag ## Key Files ```plaintext -Webhook routes: - src/app/(backend)/api/agent/webhooks/[platform]/[[...appId]]/route.ts — inbound catch-all - src/app/(backend)/api/agent/webhooks/bot-callback/route.ts — qstash bot callback - src/app/(backend)/api/agent/gateway/route.ts — cron gateway (10min window) - src/app/(backend)/api/agent/gateway/start/route.ts — non-Vercel ensureRunning +Webhook routes (mounted via `src/app/(backend)/api/agent/[[...route]]/route.ts` → `src/server/agent-hono`): + src/server/agent-hono/handlers/platformWebhook.ts — inbound catch-all (POST /webhooks/:platform/:appId?) + src/server/agent-hono/handlers/botCallback.ts — qstash bot callback + src/server/agent-hono/handlers/gatewayCron.ts — cron gateway (10min window) + src/server/agent-hono/handlers/gatewayStart.ts — non-Vercel ensureRunning Bot service: src/server/services/bot/index.ts — barrel diff --git a/src/app/(backend)/api/agent/gateway/start/route.ts b/src/app/(backend)/api/agent/gateway/start/route.ts deleted file mode 100644 index a0d0c3d01c..0000000000 --- a/src/app/(backend)/api/agent/gateway/start/route.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { NextResponse } from 'next/server'; - -import { getServerDBConfig } from '@/config/db'; -import { GatewayService } from '@/server/services/gateway'; - -export const POST = async (req: Request): Promise => { - const { KEY_VAULTS_SECRET } = getServerDBConfig(); - - const authHeader = req.headers.get('authorization'); - if (authHeader !== `Bearer ${KEY_VAULTS_SECRET}`) { - return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); - } - - const body = await req.json().catch(() => ({})); - const service = new GatewayService(); - - try { - if (body.restart) { - console.info('[GatewayService] Restarting...'); - await service.stop(); - } - - await service.ensureRunning(); - console.info('[GatewayService] Started successfully'); - - return NextResponse.json({ status: body.restart ? 'restarted' : 'started' }); - } catch (error) { - console.error('[GatewayService] Failed to start:', error); - return NextResponse.json({ error: 'Failed to start gateway' }, { status: 500 }); - } -}; diff --git a/src/app/(backend)/api/agent/webhooks/[platform]/[[...appId]]/route.ts b/src/app/(backend)/api/agent/webhooks/[platform]/[[...appId]]/route.ts deleted file mode 100644 index d21f075b99..0000000000 --- a/src/app/(backend)/api/agent/webhooks/[platform]/[[...appId]]/route.ts +++ /dev/null @@ -1,30 +0,0 @@ -import debug from 'debug'; - -import { getBotMessageRouter } from '@/server/services/bot'; - -const log = debug('lobe-server:bot:webhook-route'); - -/** - * Unified webhook endpoint for Chat SDK bot platforms. - * - * Handles both generic and bot-specific webhook URLs: - * - POST /api/agent/webhooks/[platform] - * - POST /api/agent/webhooks/[platform]/[appId] - * - * Using an optional catch-all `[[...appId]]` ensures both patterns are served - * by a single serverless function, avoiding deployment issues with nested - * dynamic segments on Vercel. - */ -export const POST = async ( - req: Request, - { params }: { params: Promise<{ appId?: string[]; platform: string }> }, -): Promise => { - const { platform, appId: appIdSegments } = await params; - const appId = appIdSegments?.[0]; - - log('Received webhook: platform=%s, appId=%s, url=%s', platform, appId ?? '(none)', req.url); - - const router = getBotMessageRouter(); - const handler = router.getWebhookHandler(platform, appId); - return handler(req); -}; diff --git a/src/app/(backend)/api/agent/webhooks/bot-callback/route.ts b/src/app/(backend)/api/agent/webhooks/bot-callback/route.ts deleted file mode 100644 index 0cca7b4b55..0000000000 --- a/src/app/(backend)/api/agent/webhooks/bot-callback/route.ts +++ /dev/null @@ -1,64 +0,0 @@ -import debug from 'debug'; -import { NextResponse } from 'next/server'; - -import { getServerDB } from '@/database/core/db-adaptor'; -import { verifyQStashSignature } from '@/libs/qstash'; -import { BotCallbackService } from '@/server/services/bot/BotCallbackService'; - -const log = debug('api-route:agent:bot-callback'); - -/** - * Bot callback endpoint for agent step/completion webhooks. - * - * In queue mode, AgentRuntimeService fires webhooks (via QStash) after each step - * and on completion. This endpoint verifies the signature and delegates to BotCallbackService. - * - * Route: POST /api/agent/webhooks/bot-callback - */ -export async function POST(request: Request): Promise { - const rawBody = await request.text(); - - const isValid = await verifyQStashSignature(request, rawBody); - if (!isValid) { - return NextResponse.json({ error: 'Invalid signature' }, { status: 401 }); - } - - const body = JSON.parse(rawBody); - - const { type, applicationId, platformThreadId, progressMessageId } = body; - - log( - 'bot-callback: type=%s, applicationId=%s, platformThreadId=%s, progressMessageId=%s', - type, - applicationId, - platformThreadId, - progressMessageId, - ); - - if (!type || !applicationId || !platformThreadId) { - return NextResponse.json( - { - error: 'Missing required fields: type, applicationId, platformThreadId', - }, - { status: 400 }, - ); - } - - if (type !== 'step' && type !== 'completion') { - return NextResponse.json({ error: `Unknown callback type: ${type}` }, { status: 400 }); - } - - try { - const serverDB = await getServerDB(); - const service = new BotCallbackService(serverDB); - await service.handleCallback(body); - - return NextResponse.json({ success: true }); - } catch (error) { - console.error('bot-callback error:', error); - return NextResponse.json( - { error: error instanceof Error ? error.message : 'Internal error' }, - { status: 500 }, - ); - } -} diff --git a/src/server/agent-hono/handlers/__tests__/botCallback.test.ts b/src/server/agent-hono/handlers/__tests__/botCallback.test.ts new file mode 100644 index 0000000000..a2c6961ab8 --- /dev/null +++ b/src/server/agent-hono/handlers/__tests__/botCallback.test.ts @@ -0,0 +1,110 @@ +// @vitest-environment node +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { botCallback } from '../botCallback'; + +const mockHandleCallback = vi.fn(); + +vi.mock('@/server/services/bot/BotCallbackService', () => ({ + BotCallbackService: vi.fn().mockImplementation(() => ({ + handleCallback: mockHandleCallback, + })), +})); + +vi.mock('@/database/core/db-adaptor', () => ({ + getServerDB: vi.fn().mockResolvedValue({} as any), +})); + +function buildContext(opts: { body?: unknown; jsonThrows?: boolean }) { + const captures: Array<{ body: any; status: number }> = []; + const ctx = { + json: (b: any, status = 200) => { + captures.push({ body: b, status }); + return Response.json(b, { status }); + }, + req: { + json: opts.jsonThrows + ? async () => { + throw new Error('bad json'); + } + : async () => opts.body, + }, + } as any; + return { ctx, getCaptures: () => captures }; +} + +const validStepBody = { + applicationId: 'app-1', + platformThreadId: 'thread-1', + progressMessageId: 'msg-1', + type: 'step', +}; + +describe('botCallback handler', () => { + beforeEach(() => { + mockHandleCallback.mockReset(); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('returns 400 when JSON parsing throws', async () => { + const { ctx } = buildContext({ jsonThrows: true }); + const res = await botCallback(ctx); + expect(res.status).toBe(400); + expect(mockHandleCallback).not.toHaveBeenCalled(); + }); + + it.each([ + ['type', { ...validStepBody, type: undefined }], + ['applicationId', { ...validStepBody, applicationId: undefined }], + ['platformThreadId', { ...validStepBody, platformThreadId: undefined }], + ])('returns 400 when required field %s is missing', async (_field, body) => { + const { ctx, getCaptures } = buildContext({ body }); + const res = await botCallback(ctx); + expect(res.status).toBe(400); + expect(getCaptures()[0].body.error).toMatch(/Missing required fields/); + expect(mockHandleCallback).not.toHaveBeenCalled(); + }); + + it('returns 400 for unknown callback types', async () => { + const { ctx, getCaptures } = buildContext({ + body: { ...validStepBody, type: 'unknown' }, + }); + const res = await botCallback(ctx); + expect(res.status).toBe(400); + expect(getCaptures()[0].body.error).toBe('Unknown callback type: unknown'); + }); + + it('delegates to BotCallbackService and returns 200 on happy path', async () => { + mockHandleCallback.mockResolvedValue(undefined); + const { ctx, getCaptures } = buildContext({ body: validStepBody }); + + const res = await botCallback(ctx); + + expect(res.status).toBe(200); + expect(getCaptures()[0].body).toEqual({ success: true }); + expect(mockHandleCallback).toHaveBeenCalledWith(validStepBody); + }); + + it('accepts type=completion', async () => { + mockHandleCallback.mockResolvedValue(undefined); + const body = { ...validStepBody, type: 'completion' }; + const { ctx } = buildContext({ body }); + + const res = await botCallback(ctx); + expect(res.status).toBe(200); + expect(mockHandleCallback).toHaveBeenCalledWith(body); + }); + + it('returns 500 with the error message when the service throws', async () => { + mockHandleCallback.mockRejectedValue(new Error('service down')); + const { ctx, getCaptures } = buildContext({ body: validStepBody }); + + const res = await botCallback(ctx); + + expect(res.status).toBe(500); + expect(getCaptures()[0].body).toEqual({ error: 'service down' }); + }); +}); diff --git a/src/server/agent-hono/handlers/__tests__/gatewayCallback.test.ts b/src/server/agent-hono/handlers/__tests__/gatewayCallback.test.ts new file mode 100644 index 0000000000..9e1ccc0e8d --- /dev/null +++ b/src/server/agent-hono/handlers/__tests__/gatewayCallback.test.ts @@ -0,0 +1,160 @@ +// @vitest-environment node +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { gatewayCallback } from '../gatewayCallback'; + +const { gatewayEnvState, mockUpdateBotRuntimeStatus } = vi.hoisted(() => ({ + gatewayEnvState: {} as { + MESSAGE_GATEWAY_ENABLED?: string; + MESSAGE_GATEWAY_SERVICE_TOKEN?: string; + }, + mockUpdateBotRuntimeStatus: vi.fn(), +})); + +vi.mock('@/envs/gateway', () => ({ + gatewayEnv: new Proxy(gatewayEnvState, { + get: (target, prop: string) => target[prop as keyof typeof target], + }), +})); + +vi.mock('@/server/services/gateway/runtimeStatus', () => ({ + BOT_RUNTIME_STATUSES: { + connected: 'connected', + disconnected: 'disconnected', + dormant: 'dormant', + failed: 'failed', + queued: 'queued', + starting: 'starting', + }, + updateBotRuntimeStatus: mockUpdateBotRuntimeStatus, +})); + +function buildContext(opts: { authHeader?: string; body?: unknown; jsonThrows?: boolean }) { + const captures: Array<{ body: any; status: number }> = []; + const ctx = { + body: (b: any, status: number) => { + captures.push({ body: b, status }); + return new Response(b, { status }); + }, + json: (b: any, status = 200) => { + captures.push({ body: b, status }); + return Response.json(b, { status }); + }, + req: { + header: (name: string) => + name.toLowerCase() === 'authorization' ? opts.authHeader : undefined, + json: opts.jsonThrows + ? async () => { + throw new Error('bad json'); + } + : async () => opts.body, + }, + } as any; + return { ctx, getCaptures: () => captures }; +} + +const validBody = { + applicationId: 'app-1', + connectionId: 'conn-1', + platform: 'discord', + state: { status: 'connected' }, +}; + +describe('gatewayCallback handler', () => { + beforeEach(() => { + mockUpdateBotRuntimeStatus.mockReset(); + gatewayEnvState.MESSAGE_GATEWAY_ENABLED = '1'; + gatewayEnvState.MESSAGE_GATEWAY_SERVICE_TOKEN = 'token-xyz'; + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('returns 204 when MESSAGE_GATEWAY_ENABLED is not "1"', async () => { + gatewayEnvState.MESSAGE_GATEWAY_ENABLED = '0'; + + const { ctx, getCaptures } = buildContext({ authHeader: 'Bearer wrong', body: validBody }); + const res = await gatewayCallback(ctx); + + expect(res.status).toBe(204); + expect(getCaptures()[0]).toEqual({ body: null, status: 204 }); + expect(mockUpdateBotRuntimeStatus).not.toHaveBeenCalled(); + }); + + it('returns 503 when service token is unset', async () => { + gatewayEnvState.MESSAGE_GATEWAY_SERVICE_TOKEN = undefined; + const { ctx } = buildContext({ authHeader: 'Bearer x', body: validBody }); + const res = await gatewayCallback(ctx); + expect(res.status).toBe(503); + }); + + it('returns 401 on bearer mismatch', async () => { + const { ctx } = buildContext({ authHeader: 'Bearer wrong', body: validBody }); + const res = await gatewayCallback(ctx); + expect(res.status).toBe(401); + }); + + it('returns 400 on JSON parse failure', async () => { + const { ctx } = buildContext({ authHeader: 'Bearer token-xyz', jsonThrows: true }); + const res = await gatewayCallback(ctx); + expect(res.status).toBe(400); + }); + + it('returns 400 on zod validation failure (missing connectionId)', async () => { + const { ctx, getCaptures } = buildContext({ + authHeader: 'Bearer token-xyz', + body: { ...validBody, connectionId: undefined }, + }); + const res = await gatewayCallback(ctx); + expect(res.status).toBe(400); + expect(getCaptures()[0].body.error).toBe('Invalid body'); + }); + + it('returns 204 silently when applicationId is missing (no status update)', async () => { + const { ctx } = buildContext({ + authHeader: 'Bearer token-xyz', + body: { ...validBody, applicationId: undefined }, + }); + const res = await gatewayCallback(ctx); + expect(res.status).toBe(204); + expect(mockUpdateBotRuntimeStatus).not.toHaveBeenCalled(); + }); + + it('returns 204 without writing status when state is "connecting"', async () => { + const { ctx } = buildContext({ + authHeader: 'Bearer token-xyz', + body: { ...validBody, state: { status: 'connecting' } }, + }); + const res = await gatewayCallback(ctx); + expect(res.status).toBe(204); + expect(mockUpdateBotRuntimeStatus).not.toHaveBeenCalled(); + }); + + it.each([ + ['connected', 'connected'], + ['disconnected', 'disconnected'], + ['dormant', 'dormant'], + ['error', 'failed'], + ])('maps state.status=%s to runtimeStatus=%s and writes once', async (incoming, expected) => { + mockUpdateBotRuntimeStatus.mockResolvedValue(undefined); + const { ctx } = buildContext({ + authHeader: 'Bearer token-xyz', + body: { + ...validBody, + state: { error: 'oops', status: incoming }, + }, + }); + + const res = await gatewayCallback(ctx); + + expect(res.status).toBe(204); + expect(mockUpdateBotRuntimeStatus).toHaveBeenCalledTimes(1); + expect(mockUpdateBotRuntimeStatus).toHaveBeenCalledWith({ + applicationId: 'app-1', + errorMessage: 'oops', + platform: 'discord', + status: expected, + }); + }); +}); diff --git a/src/server/agent-hono/handlers/__tests__/gatewayStart.test.ts b/src/server/agent-hono/handlers/__tests__/gatewayStart.test.ts new file mode 100644 index 0000000000..2edaa549c9 --- /dev/null +++ b/src/server/agent-hono/handlers/__tests__/gatewayStart.test.ts @@ -0,0 +1,88 @@ +// @vitest-environment node +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { gatewayStart } from '../gatewayStart'; + +const { mockEnsureRunning, mockStop } = vi.hoisted(() => ({ + mockEnsureRunning: vi.fn(), + mockStop: vi.fn(), +})); + +vi.mock('@/server/services/gateway', () => ({ + GatewayService: vi.fn().mockImplementation(() => ({ + ensureRunning: mockEnsureRunning, + stop: mockStop, + })), +})); + +function buildContext(opts: { body?: unknown; jsonThrows?: boolean }) { + const captures: Array<{ body: any; status: number }> = []; + const ctx = { + json: (b: any, status = 200) => { + captures.push({ body: b, status }); + return Response.json(b, { status }); + }, + req: { + json: opts.jsonThrows + ? async () => { + throw new Error('bad json'); + } + : async () => opts.body, + }, + } as any; + return { ctx, getCaptures: () => captures }; +} + +describe('gatewayStart handler', () => { + beforeEach(() => { + mockEnsureRunning.mockReset().mockResolvedValue(undefined); + mockStop.mockReset().mockResolvedValue(undefined); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('returns status="started" and only calls ensureRunning when restart is falsy', async () => { + const { ctx, getCaptures } = buildContext({ body: {} }); + const res = await gatewayStart(ctx); + + expect(res.status).toBe(200); + expect(getCaptures()[0].body).toEqual({ status: 'started' }); + expect(mockStop).not.toHaveBeenCalled(); + expect(mockEnsureRunning).toHaveBeenCalledTimes(1); + }); + + it('treats a malformed JSON body as empty {} and starts the gateway', async () => { + const { ctx, getCaptures } = buildContext({ jsonThrows: true }); + const res = await gatewayStart(ctx); + + expect(res.status).toBe(200); + expect(getCaptures()[0].body).toEqual({ status: 'started' }); + expect(mockEnsureRunning).toHaveBeenCalledTimes(1); + }); + + it('stops then ensures running when restart=true and reports "restarted"', async () => { + const { ctx, getCaptures } = buildContext({ body: { restart: true } }); + const res = await gatewayStart(ctx); + + expect(res.status).toBe(200); + expect(getCaptures()[0].body).toEqual({ status: 'restarted' }); + expect(mockStop).toHaveBeenCalledTimes(1); + expect(mockEnsureRunning).toHaveBeenCalledTimes(1); + // stop must be called before ensureRunning + expect(mockStop.mock.invocationCallOrder[0]).toBeLessThan( + mockEnsureRunning.mock.invocationCallOrder[0], + ); + }); + + it('returns 500 when ensureRunning throws', async () => { + mockEnsureRunning.mockRejectedValue(new Error('boom')); + const { ctx, getCaptures } = buildContext({ body: {} }); + + const res = await gatewayStart(ctx); + + expect(res.status).toBe(500); + expect(getCaptures()[0].body).toEqual({ error: 'Failed to start gateway' }); + }); +}); diff --git a/src/server/agent-hono/handlers/__tests__/platformWebhook.test.ts b/src/server/agent-hono/handlers/__tests__/platformWebhook.test.ts new file mode 100644 index 0000000000..c6f5463989 --- /dev/null +++ b/src/server/agent-hono/handlers/__tests__/platformWebhook.test.ts @@ -0,0 +1,88 @@ +// @vitest-environment node +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { platformWebhook } from '../platformWebhook'; + +const { mockGetWebhookHandler, mockGetBotMessageRouter } = vi.hoisted(() => { + const handler = vi.fn(); + return { + mockGetBotMessageRouter: vi.fn(() => ({ getWebhookHandler: handler })), + mockGetWebhookHandler: handler, + }; +}); + +vi.mock('@/server/services/bot', () => ({ + getBotMessageRouter: mockGetBotMessageRouter, +})); + +function buildContext(opts: { params: Record; url?: string }) { + const rawRequest = new Request(opts.url ?? 'http://x/api/agent/webhooks/telegram/app-1', { + method: 'POST', + body: '{}', + }); + const captures: Array<{ body: any; status: number }> = []; + const ctx = { + json: (b: any, status = 200) => { + captures.push({ body: b, status }); + return Response.json(b, { status }); + }, + req: { + param: (name: string) => opts.params[name], + raw: rawRequest, + url: rawRequest.url, + }, + } as any; + return { ctx, getCaptures: () => captures, rawRequest }; +} + +describe('platformWebhook handler', () => { + beforeEach(() => { + mockGetWebhookHandler.mockReset(); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('returns 400 when platform param is missing', async () => { + const { ctx, getCaptures } = buildContext({ params: { platform: undefined } }); + + const res = await platformWebhook(ctx); + + expect(res.status).toBe(400); + expect(getCaptures()[0].body).toEqual({ error: 'platform is required' }); + expect(mockGetWebhookHandler).not.toHaveBeenCalled(); + }); + + it('forwards platform + appId + raw request to the bot message router', async () => { + const platformResponse = new Response('ok', { status: 202 }); + const innerHandler = vi.fn().mockResolvedValue(platformResponse); + mockGetWebhookHandler.mockReturnValue(innerHandler); + + const { ctx, rawRequest } = buildContext({ + params: { appId: 'app-1', platform: 'telegram' }, + }); + + const res = await platformWebhook(ctx); + + expect(res).toBe(platformResponse); + expect(mockGetWebhookHandler).toHaveBeenCalledWith('telegram', 'app-1'); + // Platform-side signature verification depends on the original Request, + // so the handler must receive `c.req.raw` verbatim. + expect(innerHandler).toHaveBeenCalledTimes(1); + expect(innerHandler.mock.calls[0][0]).toBe(rawRequest); + }); + + it('passes appId=undefined when only platform is provided', async () => { + mockGetWebhookHandler.mockImplementation(() => async () => new Response(null, { status: 204 })); + + const { ctx } = buildContext({ + params: { platform: 'discord' }, + url: 'http://x/api/agent/webhooks/discord', + }); + + await platformWebhook(ctx); + + expect(mockGetWebhookHandler).toHaveBeenCalledWith('discord', undefined); + }); +}); diff --git a/src/server/agent-hono/handlers/__tests__/runStep.test.ts b/src/server/agent-hono/handlers/__tests__/runStep.test.ts new file mode 100644 index 0000000000..11db8c6f12 --- /dev/null +++ b/src/server/agent-hono/handlers/__tests__/runStep.test.ts @@ -0,0 +1,194 @@ +// @vitest-environment node +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { runStep, runStepHealth } from '../runStep'; + +const mockGetOperationMetadata = vi.fn(); +const mockExecuteStep = vi.fn(); + +vi.mock('@/server/modules/AgentRuntime', () => ({ + AgentRuntimeCoordinator: vi.fn().mockImplementation(() => ({ + getOperationMetadata: mockGetOperationMetadata, + })), +})); + +vi.mock('@/server/services/agentRuntime', () => ({ + AgentRuntimeService: vi.fn().mockImplementation(() => ({ + executeStep: mockExecuteStep, + })), +})); + +vi.mock('@/database/core/db-adaptor', () => ({ + getServerDB: vi.fn().mockResolvedValue({} as any), +})); + +function buildContext(opts: { body?: unknown; jsonThrows?: boolean; retried?: string }) { + const captures: Array<{ body: any; status: number; headers?: Record }> = []; + const ctx = { + json: (b: any, status = 200, headers?: Record) => { + captures.push({ body: b, status, headers }); + return Response.json(b, { status, headers }); + }, + req: { + header: (name: string) => + name.toLowerCase() === 'upstash-retried' ? opts.retried : undefined, + json: opts.jsonThrows + ? async () => { + throw new Error('bad json'); + } + : async () => opts.body, + }, + } as any; + return { ctx, getCaptures: () => captures }; +} + +const validBody = { + context: { foo: 'bar' }, + operationId: 'op-1', + stepIndex: 2, +}; + +describe('runStep handler', () => { + beforeEach(() => { + mockGetOperationMetadata.mockReset(); + mockExecuteStep.mockReset(); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('returns 400 when JSON parsing throws', async () => { + const { ctx, getCaptures } = buildContext({ jsonThrows: true }); + const res = await runStep(ctx); + expect(res.status).toBe(400); + expect(getCaptures()[0].body).toEqual({ error: 'Invalid JSON body' }); + expect(mockGetOperationMetadata).not.toHaveBeenCalled(); + }); + + it('returns 400 when operationId is missing', async () => { + const { ctx } = buildContext({ body: { stepIndex: 0 } }); + const res = await runStep(ctx); + expect(res.status).toBe(400); + expect(mockGetOperationMetadata).not.toHaveBeenCalled(); + }); + + it('returns 401 when operation metadata has no userId', async () => { + mockGetOperationMetadata.mockResolvedValue(null); + const { ctx, getCaptures } = buildContext({ body: validBody }); + + const res = await runStep(ctx); + + expect(res.status).toBe(401); + expect(getCaptures()[0].body).toEqual({ error: 'Invalid operation or unauthorized' }); + expect(mockExecuteStep).not.toHaveBeenCalled(); + }); + + it('returns 429 with Retry-After header when the step is locked', async () => { + mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1' }); + mockExecuteStep.mockResolvedValue({ + locked: true, + nextStepScheduled: false, + state: {}, + success: false, + }); + + const { ctx, getCaptures } = buildContext({ body: validBody }); + const res = await runStep(ctx); + + expect(res.status).toBe(429); + const captured = getCaptures()[0]; + expect(captured.body).toMatchObject({ + error: 'Step is currently being executed, retry later', + operationId: 'op-1', + stepIndex: 2, + }); + expect(captured.headers).toEqual({ 'Retry-After': '37' }); + }); + + it('forwards the upstash-retried header to executeStep as externalRetryCount', async () => { + mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1' }); + mockExecuteStep.mockResolvedValue({ + nextStepScheduled: false, + state: { status: 'done', cost: { total: 0 }, stepCount: 1 }, + success: true, + }); + + const { ctx } = buildContext({ body: validBody, retried: '3' }); + await runStep(ctx); + + expect(mockExecuteStep).toHaveBeenCalledWith( + expect.objectContaining({ externalRetryCount: 3, operationId: 'op-1', stepIndex: 2 }), + ); + }); + + it('shapes the success response with status, totals and pending fields', async () => { + mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1' }); + mockExecuteStep.mockResolvedValue({ + nextStepScheduled: true, + state: { + cost: { total: 0.42 }, + pendingHumanPrompt: { id: 'p1' }, + pendingHumanSelect: undefined, + pendingToolsCalling: ['t1'], + status: 'waiting_for_human', + stepCount: 5, + }, + success: true, + }); + + const { ctx, getCaptures } = buildContext({ body: validBody }); + const res = await runStep(ctx); + + expect(res.status).toBe(200); + expect(getCaptures()[0].body).toMatchObject({ + completed: false, + nextStepIndex: 3, + nextStepScheduled: true, + operationId: 'op-1', + pendingApproval: ['t1'], + pendingPrompt: { id: 'p1' }, + status: 'waiting_for_human', + stepIndex: 2, + success: true, + totalCost: 0.42, + totalSteps: 5, + waitingForHuman: true, + }); + }); + + it('returns 500 on unexpected service errors and echoes operationId', async () => { + mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1' }); + mockExecuteStep.mockRejectedValue(new Error('boom')); + + const { ctx, getCaptures } = buildContext({ body: validBody }); + const res = await runStep(ctx); + + expect(res.status).toBe(500); + expect(getCaptures()[0].body).toMatchObject({ + error: 'boom', + operationId: 'op-1', + stepIndex: 2, + }); + }); +}); + +describe('runStepHealth handler', () => { + it('returns a healthy payload', () => { + const captures: any[] = []; + const ctx = { + json: (b: any, status = 200) => { + captures.push({ body: b, status }); + return Response.json(b, { status }); + }, + } as any; + + const res = runStepHealth(ctx); + + expect(res.status).toBe(200); + expect(captures[0].body).toMatchObject({ + healthy: true, + message: 'Agent execution service is running', + }); + }); +}); diff --git a/src/server/agent-hono/handlers/botCallback.ts b/src/server/agent-hono/handlers/botCallback.ts new file mode 100644 index 0000000000..e39d46d874 --- /dev/null +++ b/src/server/agent-hono/handlers/botCallback.ts @@ -0,0 +1,52 @@ +import debug from 'debug'; +import type { Context } from 'hono'; + +import { getServerDB } from '@/database/core/db-adaptor'; +import { BotCallbackService } from '@/server/services/bot/BotCallbackService'; + +const log = debug('lobe-server:agent:bot-callback'); + +/** + * Bot callback endpoint for agent step/completion webhooks. + * + * In queue mode, AgentRuntimeService fires webhooks (via QStash) after each step + * and on completion. This endpoint verifies the signature (via the `qstashAuth` + * middleware on the route) and delegates to BotCallbackService. + */ +export async function botCallback(c: Context): Promise { + let body: any; + try { + body = await c.req.json(); + } catch { + return c.json({ error: 'Invalid JSON body' }, 400); + } + + const { type, applicationId, platformThreadId, progressMessageId } = body; + + log( + 'bot-callback: type=%s, applicationId=%s, platformThreadId=%s, progressMessageId=%s', + type, + applicationId, + platformThreadId, + progressMessageId, + ); + + if (!type || !applicationId || !platformThreadId) { + return c.json({ error: 'Missing required fields: type, applicationId, platformThreadId' }, 400); + } + + if (type !== 'step' && type !== 'completion') { + return c.json({ error: `Unknown callback type: ${type}` }, 400); + } + + try { + const serverDB = await getServerDB(); + const service = new BotCallbackService(serverDB); + await service.handleCallback(body); + + return c.json({ success: true }); + } catch (error) { + console.error('bot-callback error:', error); + return c.json({ error: error instanceof Error ? error.message : 'Internal error' }, 500); + } +} diff --git a/src/app/(backend)/api/agent/gateway/callback/route.ts b/src/server/agent-hono/handlers/gatewayCallback.ts similarity index 62% rename from src/app/(backend)/api/agent/gateway/callback/route.ts rename to src/server/agent-hono/handlers/gatewayCallback.ts index b090140b8e..8d8a2cf06f 100644 --- a/src/app/(backend)/api/agent/gateway/callback/route.ts +++ b/src/server/agent-hono/handlers/gatewayCallback.ts @@ -1,5 +1,5 @@ import debug from 'debug'; -import { type NextRequest, NextResponse } from 'next/server'; +import type { Context } from 'hono'; import { z } from 'zod'; import { gatewayEnv } from '@/envs/gateway'; @@ -9,7 +9,7 @@ import { updateBotRuntimeStatus, } from '@/server/services/gateway/runtimeStatus'; -const log = debug('api-route:agent:gateway:callback'); +const log = debug('lobe-server:agent:gateway-callback'); const StateChangeSchema = z.object({ applicationId: z.string().optional(), @@ -23,49 +23,46 @@ const StateChangeSchema = z.object({ /** * Receive connection state-change callbacks from the external message gateway. - * When a persistent connection (e.g. Discord WebSocket) transitions to - * "connected" or "error" asynchronously, the gateway POSTs here so LobeHub - * can update the bot runtime status visible to users. + * Authenticated with `MESSAGE_GATEWAY_SERVICE_TOKEN`. * - * Authenticated with MESSAGE_GATEWAY_SERVICE_TOKEN. + * Auth is inline (not a route-level middleware) because the disabled-feature + * 204 short-circuit must run *before* the auth check — when the gateway is + * off we silently no-op rather than 401 stale callers. */ -export async function POST(request: NextRequest) { +export async function gatewayCallback(c: Context): Promise { // Ignore callbacks when gateway is disabled — connections are managed locally, // and stale gateway callbacks (e.g. from disconnectAll during migration) could // overwrite locally-managed status. if (gatewayEnv.MESSAGE_GATEWAY_ENABLED !== '1') { - return new NextResponse(null, { status: 204 }); + return c.body(null, 204); } const serviceToken = gatewayEnv.MESSAGE_GATEWAY_SERVICE_TOKEN; if (!serviceToken) { - return NextResponse.json({ error: 'Service not configured' }, { status: 503 }); + return c.json({ error: 'Service not configured' }, 503); } - const authHeader = request.headers.get('authorization'); + const authHeader = c.req.header('authorization'); if (authHeader !== `Bearer ${serviceToken}`) { - return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }); + return c.json({ error: 'Unauthorized' }, 401); } let parsed; try { - const body = await request.json(); + const body = await c.req.json(); parsed = StateChangeSchema.safeParse(body); } catch { - return NextResponse.json({ error: 'Invalid JSON' }, { status: 400 }); + return c.json({ error: 'Invalid JSON' }, 400); } if (!parsed.success) { - return NextResponse.json( - { error: 'Invalid body', issues: parsed.error.issues }, - { status: 400 }, - ); + return c.json({ error: 'Invalid body', issues: parsed.error.issues }, 400); } const { applicationId, platform, state } = parsed.data; if (!applicationId) { - return new NextResponse(null, { status: 204 }); + return c.body(null, 204); } const statusMap: Partial> = { @@ -78,7 +75,7 @@ export async function POST(request: NextRequest) { const runtimeStatus = statusMap[state.status]; if (!runtimeStatus) { // "connecting" — no status update needed - return new NextResponse(null, { status: 204 }); + return c.body(null, 204); } await updateBotRuntimeStatus({ @@ -90,5 +87,5 @@ export async function POST(request: NextRequest) { log('Updated %s:%s → %s', platform, applicationId, runtimeStatus); - return new NextResponse(null, { status: 204 }); + return c.body(null, 204); } diff --git a/src/app/(backend)/api/agent/gateway/route.ts b/src/server/agent-hono/handlers/gatewayCron.ts similarity index 93% rename from src/app/(backend)/api/agent/gateway/route.ts rename to src/server/agent-hono/handlers/gatewayCron.ts index 7a4b2a1a95..19f7347090 100644 --- a/src/app/(backend)/api/agent/gateway/route.ts +++ b/src/server/agent-hono/handlers/gatewayCron.ts @@ -1,5 +1,5 @@ import debug from 'debug'; -import type { NextRequest } from 'next/server'; +import type { Context } from 'hono'; import { after } from 'next/server'; import { getServerDB } from '@/database/core/db-adaptor'; @@ -125,12 +125,14 @@ async function processConnectQueue(remainingMs: number): Promise { return processed; } -export async function GET(request: NextRequest) { - const authHeader = request.headers.get('authorization'); - if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) { - return new Response('Unauthorized', { status: 401 }); - } - +/** + * Cron-driven gateway entry point. Runs once per Vercel cron tick and keeps + * persistent bot connections alive for a 10-minute window via `next/server`'s + * `after()`. + * + * Auth: `bearerSecretAuth(CRON_SECRET)` on the route. + */ +export async function gatewayCron(c: Context): Promise { // When the external message gateway is enabled, sync connections via gateway. if (process.env.MESSAGE_GATEWAY_URL && process.env.MESSAGE_GATEWAY_SERVICE_TOKEN) { const { GatewayService } = await import('@/server/services/gateway'); @@ -138,7 +140,7 @@ export async function GET(request: NextRequest) { if (service.useMessageGateway) { await service.ensureRunning(); - return Response.json({ ensureRunning: true }); + return c.json({ ensureRunning: true }); } } @@ -215,5 +217,5 @@ export async function GET(request: NextRequest) { } }); - return Response.json({ platforms: stats, queued, started, total }); + return c.json({ platforms: stats, queued, started, total }); } diff --git a/src/server/agent-hono/handlers/gatewayStart.ts b/src/server/agent-hono/handlers/gatewayStart.ts new file mode 100644 index 0000000000..39e93c3830 --- /dev/null +++ b/src/server/agent-hono/handlers/gatewayStart.ts @@ -0,0 +1,29 @@ +import type { Context } from 'hono'; + +import { GatewayService } from '@/server/services/gateway'; + +/** + * Non-Vercel `ensureRunning` entry point — used by the standalone server + * launcher (`scripts/serverLauncher/startServer.js`). Body: `{ restart?: boolean }`. + * + * Auth: `bearerSecretAuth(KEY_VAULTS_SECRET)` on the route. + */ +export async function gatewayStart(c: Context): Promise { + const body = await c.req.json().catch(() => ({}) as Record); + const service = new GatewayService(); + + try { + if ((body as { restart?: boolean }).restart) { + console.info('[GatewayService] Restarting...'); + await service.stop(); + } + + await service.ensureRunning(); + console.info('[GatewayService] Started successfully'); + + return c.json({ status: (body as { restart?: boolean }).restart ? 'restarted' : 'started' }); + } catch (error) { + console.error('[GatewayService] Failed to start:', error); + return c.json({ error: 'Failed to start gateway' }, 500); + } +} diff --git a/src/server/agent-hono/handlers/platformWebhook.ts b/src/server/agent-hono/handlers/platformWebhook.ts new file mode 100644 index 0000000000..7a687ee06c --- /dev/null +++ b/src/server/agent-hono/handlers/platformWebhook.ts @@ -0,0 +1,30 @@ +import debug from 'debug'; +import type { Context } from 'hono'; + +import { getBotMessageRouter } from '@/server/services/bot'; + +const log = debug('lobe-server:bot:webhook-route'); + +/** + * Unified webhook endpoint for Chat SDK bot platforms. Handles both: + * - POST /api/agent/webhooks/:platform + * - POST /api/agent/webhooks/:platform/:appId + * + * Hono receives the raw `Request` via `c.req.raw` and forwards it to the + * platform-specific handler returned by the bot message router (the platform + * is responsible for verifying its own signature). + */ +export async function platformWebhook(c: Context): Promise { + const platform = c.req.param('platform'); + const appId = c.req.param('appId'); + + if (!platform) { + return c.json({ error: 'platform is required' }, 400); + } + + log('Received webhook: platform=%s, appId=%s, url=%s', platform, appId ?? '(none)', c.req.url); + + const router = getBotMessageRouter(); + const handler = router.getWebhookHandler(platform, appId); + return handler(c.req.raw); +} diff --git a/src/app/(backend)/api/agent/run/route.ts b/src/server/agent-hono/handlers/runStep.ts similarity index 61% rename from src/app/(backend)/api/agent/run/route.ts rename to src/server/agent-hono/handlers/runStep.ts index ebb4557401..43b528b6ed 100644 --- a/src/app/(backend)/api/agent/run/route.ts +++ b/src/server/agent-hono/handlers/runStep.ts @@ -1,29 +1,30 @@ import debug from 'debug'; -import { type NextRequest } from 'next/server'; -import { NextResponse } from 'next/server'; +import type { Context } from 'hono'; import { getServerDB } from '@/database/core/db-adaptor'; -import { verifyQStashSignature } from '@/libs/qstash'; import { AgentRuntimeCoordinator } from '@/server/modules/AgentRuntime'; import { AgentRuntimeService } from '@/server/services/agentRuntime'; -const log = debug('api-route:agent:execute-step'); +const log = debug('lobe-server:agent:run-step'); -export async function POST(request: NextRequest) { +/** + * Execute a single agent step. Invoked by QStash with the body + * `{ operationId, stepIndex, context, humanInput?, approvedToolCall?, ... }`. + * + * Auth: `qstashAuth` on the route — QStash signature required. + */ +export async function runStep(c: Context): Promise { const startTime = Date.now(); - // Read raw body for signature verification (must be done before parsing JSON) - const rawBody = await request.text(); - - // Verify QStash signature - const isValidSignature = await verifyQStashSignature(request, rawBody); - if (!isValidSignature) { - return NextResponse.json({ error: 'Invalid signature' }, { status: 401 }); + let body: any; + try { + body = await c.req.json(); + } catch { + return c.json({ error: 'Invalid JSON body' }, 400); } - // Parse body after verification - const body = JSON.parse(rawBody); - const externalRetryCount = Number(request.headers.get('Upstash-Retried') ?? 0) || 0; + const externalRetryCount = Number(c.req.header('upstash-retried') ?? 0) || 0; + try { const { operationId, @@ -37,7 +38,7 @@ export async function POST(request: NextRequest) { } = body; if (!operationId) { - return NextResponse.json({ error: 'operationId is required' }, { status: 400 }); + return c.json({ error: 'operationId is required' }, 400); } log(`[${operationId}] Starting step ${stepIndex}`); @@ -48,14 +49,12 @@ export async function POST(request: NextRequest) { if (!metadata?.userId) { log(`[${operationId}] Invalid operation or no userId found`); - return NextResponse.json({ error: 'Invalid operation or unauthorized' }, { status: 401 }); + return c.json({ error: 'Invalid operation or unauthorized' }, 401); } - // Initialize service with userId from operation metadata const serverDB = await getServerDB(); const agentRuntimeService = new AgentRuntimeService(serverDB, metadata.userId); - // Execute step using AgentRuntimeService const result = await agentRuntimeService.executeStep({ approvedToolCall, context, @@ -71,14 +70,10 @@ export async function POST(request: NextRequest) { // Step is currently being executed by another instance — tell QStash to retry later if (result.locked) { log(`[${operationId}] Step ${stepIndex} locked by another instance, returning 429`); - return NextResponse.json( + return c.json( { error: 'Step is currently being executed, retry later', operationId, stepIndex }, - { - status: 429, - headers: { - 'Retry-After': '37', // unit: seconds - }, - }, + 429, + { 'Retry-After': '37' }, ); } @@ -106,41 +101,30 @@ export async function POST(request: NextRequest) { `[${operationId}] Step ${stepIndex} completed (${executionTime}ms, status: ${result.state.status})`, ); - return NextResponse.json(responseData); + return c.json(responseData); } catch (error: any) { const executionTime = Date.now() - startTime; console.error('Error in execution: %O', error); - return NextResponse.json( + return c.json( { error: error.message, executionTime, operationId: body?.operationId, stepIndex: body?.stepIndex || 0, }, - { status: 500 }, + 500, ); } } /** - * Health check endpoint + * Health check for the agent execution path. */ -export async function GET() { - try { - return NextResponse.json({ - healthy: true, - message: 'Agent execution service is running', - timestamp: new Date().toISOString(), - }); - } catch (error: any) { - return NextResponse.json( - { - error: error.message, - healthy: false, - timestamp: new Date().toISOString(), - }, - { status: 503 }, - ); - } +export function runStepHealth(c: Context): Response { + return c.json({ + healthy: true, + message: 'Agent execution service is running', + timestamp: new Date().toISOString(), + }); } diff --git a/src/server/agent-hono/index.ts b/src/server/agent-hono/index.ts index db1123c321..0ac3b0dbae 100644 --- a/src/server/agent-hono/index.ts +++ b/src/server/agent-hono/index.ts @@ -1,8 +1,16 @@ import { Hono } from 'hono'; +import { botCallback } from './handlers/botCallback'; import { execAgent } from './handlers/execAgent'; import { finalizeAbandoned } from './handlers/finalizeAbandoned'; +import { gatewayCallback } from './handlers/gatewayCallback'; +import { gatewayCron } from './handlers/gatewayCron'; +import { gatewayStart } from './handlers/gatewayStart'; +import { platformWebhook } from './handlers/platformWebhook'; +import { runStep, runStepHealth } from './handlers/runStep'; import { toolResult } from './handlers/toolResult'; +import { bearerSecretAuth } from './middlewares/bearerSecretAuth'; +import { qstashAuth } from './middlewares/qstashAuth'; import { qstashOrApiKeyAuth } from './middlewares/qstashOrApiKeyAuth'; import { serviceTokenAuth } from './middlewares/serviceTokenAuth'; @@ -10,9 +18,8 @@ import { serviceTokenAuth } from './middlewares/serviceTokenAuth'; * Hono app for `/api/agent/*` endpoints. Mounted via the Next.js optional * catch-all at `src/app/(backend)/api/agent/[[...route]]/route.ts`. * - * Routing precedence: existing static `route.ts` files (e.g. `run/route.ts`, - * `stream/route.ts`, `gateway/*`, `webhooks/*`) win over the catch-all, so - * individual paths can migrate one at a time — delete the static `route.ts` + * Routing precedence: existing static `route.ts` files win over the catch-all, + * so individual paths can migrate one at a time — delete the static `route.ts` * and add the corresponding handler here. */ const app = new Hono().basePath('/api/agent'); @@ -20,6 +27,10 @@ const app = new Hono().basePath('/api/agent'); // POST /api/agent — start a new agent operation (QStash sig OR API key) app.post('/', qstashOrApiKeyAuth(), execAgent); +// POST /api/agent/run — execute a single step (QStash signature) +app.post('/run', qstashAuth(), runStep); +app.get('/run', runStepHealth); + // POST /api/agent/tool-result — gateway-side tool result LPUSH'd to Redis app.post('/tool-result', serviceTokenAuth(), toolResult); @@ -33,4 +44,28 @@ app.get('/finalize-abandoned', (c) => }), ); +// GET /api/agent/gateway — Vercel cron entry point (Bearer CRON_SECRET) +app.get( + '/gateway', + bearerSecretAuth(() => process.env.CRON_SECRET), + gatewayCron, +); + +// POST /api/agent/gateway/start — non-Vercel ensureRunning (Bearer KEY_VAULTS_SECRET) +app.post( + '/gateway/start', + bearerSecretAuth(() => process.env.KEY_VAULTS_SECRET), + gatewayStart, +); + +// POST /api/agent/gateway/callback — message gateway state-change callbacks +// (auth is inline so the disabled-feature 204 short-circuits before auth) +app.post('/gateway/callback', gatewayCallback); + +// POST /api/agent/webhooks/bot-callback — agent step/completion webhooks (QStash) +app.post('/webhooks/bot-callback', qstashAuth(), botCallback); + +// POST /api/agent/webhooks/:platform[/:appId] — Chat SDK bot platform webhooks +app.post('/webhooks/:platform/:appId?', platformWebhook); + export default app; diff --git a/src/server/agent-hono/middlewares/__tests__/bearerSecretAuth.test.ts b/src/server/agent-hono/middlewares/__tests__/bearerSecretAuth.test.ts new file mode 100644 index 0000000000..25426ae4e4 --- /dev/null +++ b/src/server/agent-hono/middlewares/__tests__/bearerSecretAuth.test.ts @@ -0,0 +1,83 @@ +// @vitest-environment node +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { bearerSecretAuth } from '../bearerSecretAuth'; + +function buildContext(authHeader?: string) { + let captured: { body: any; status: number } | undefined; + const ctx = { + json: (b: any, status = 200) => { + captured = { body: b, status }; + return Response.json(b, { status }); + }, + req: { + header: (name: string) => (name.toLowerCase() === 'authorization' ? authHeader : undefined), + path: '/api/agent/gateway', + }, + } as any; + return { ctx, getCaptured: () => captured }; +} + +describe('bearerSecretAuth middleware', () => { + afterEach(() => { + vi.clearAllMocks(); + }); + + it('returns 503 when the secret is unset', async () => { + const next = vi.fn(); + const { ctx, getCaptured } = buildContext('Bearer anything'); + + const res = await bearerSecretAuth(() => undefined)(ctx, next); + + expect(res?.status).toBe(503); + expect(getCaptured()?.body).toEqual({ error: 'Service not configured' }); + expect(next).not.toHaveBeenCalled(); + }); + + it('returns 401 when the authorization header is missing', async () => { + const next = vi.fn(); + const { ctx, getCaptured } = buildContext(undefined); + + const res = await bearerSecretAuth(() => 'shh')(ctx, next); + + expect(res?.status).toBe(401); + expect(getCaptured()?.body).toEqual({ error: 'Unauthorized' }); + expect(next).not.toHaveBeenCalled(); + }); + + it('returns 401 when the authorization header does not match', async () => { + const next = vi.fn(); + const { ctx } = buildContext('Bearer wrong'); + + const res = await bearerSecretAuth(() => 'shh')(ctx, next); + + expect(res?.status).toBe(401); + expect(next).not.toHaveBeenCalled(); + }); + + it('calls next when the bearer token matches', async () => { + const next = vi.fn().mockResolvedValue(undefined); + const { ctx } = buildContext('Bearer shh'); + + await bearerSecretAuth(() => 'shh')(ctx, next); + + expect(next).toHaveBeenCalledTimes(1); + }); + + it('evaluates the secret getter on each request (no cache)', async () => { + const getSecret = vi.fn(); + getSecret.mockReturnValueOnce(undefined).mockReturnValueOnce('s'); + + const mw = bearerSecretAuth(getSecret); + + // first call: secret unset → 503 + const first = await mw(buildContext('Bearer s').ctx, vi.fn()); + expect(first?.status).toBe(503); + + // second call: secret now set → 200/next + const next = vi.fn().mockResolvedValue(undefined); + await mw(buildContext('Bearer s').ctx, next); + expect(next).toHaveBeenCalledTimes(1); + expect(getSecret).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/server/agent-hono/middlewares/__tests__/qstashAuth.test.ts b/src/server/agent-hono/middlewares/__tests__/qstashAuth.test.ts new file mode 100644 index 0000000000..d194cc7672 --- /dev/null +++ b/src/server/agent-hono/middlewares/__tests__/qstashAuth.test.ts @@ -0,0 +1,64 @@ +// @vitest-environment node +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +// Imported after the mock so the middleware picks up the stubbed verifier. +import { qstashAuth } from '../qstashAuth'; + +const mockVerify = vi.fn<(req: Request, body: string) => Promise>(); +vi.mock('@/libs/qstash', () => ({ + verifyQStashSignature: (req: Request, body: string) => mockVerify(req, body), +})); + +function buildContext(rawBody: string) { + const rawRequest = new Request('http://x/api/agent/run', { + method: 'POST', + body: rawBody, + }); + let captured: { body: any; status: number } | undefined; + const ctx = { + json: (b: any, status = 200) => { + captured = { body: b, status }; + return Response.json(b, { status }); + }, + req: { + path: '/api/agent/run', + raw: rawRequest, + text: async () => rawBody, + }, + } as any; + return { ctx, getCaptured: () => captured }; +} + +describe('qstashAuth middleware', () => { + beforeEach(() => { + mockVerify.mockReset(); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('passes the raw body and request to verifyQStashSignature', async () => { + mockVerify.mockResolvedValue(true); + const next = vi.fn().mockResolvedValue(undefined); + const { ctx } = buildContext('{"hello":"world"}'); + + await qstashAuth()(ctx, next); + + expect(mockVerify).toHaveBeenCalledTimes(1); + expect(mockVerify).toHaveBeenCalledWith(ctx.req.raw, '{"hello":"world"}'); + expect(next).toHaveBeenCalledTimes(1); + }); + + it('rejects with 401 when signature is invalid', async () => { + mockVerify.mockResolvedValue(false); + const next = vi.fn(); + const { ctx, getCaptured } = buildContext('{"hello":"world"}'); + + const res = await qstashAuth()(ctx, next); + + expect(res?.status).toBe(401); + expect(getCaptured()?.body).toEqual({ error: 'Invalid signature' }); + expect(next).not.toHaveBeenCalled(); + }); +}); diff --git a/src/server/agent-hono/middlewares/bearerSecretAuth.ts b/src/server/agent-hono/middlewares/bearerSecretAuth.ts new file mode 100644 index 0000000000..1b31010f30 --- /dev/null +++ b/src/server/agent-hono/middlewares/bearerSecretAuth.ts @@ -0,0 +1,31 @@ +import debug from 'debug'; +import type { MiddlewareHandler } from 'hono'; + +const log = debug('lobe-server:agent:bearer-secret-auth'); + +/** + * Hono middleware factory that requires `Authorization: Bearer ` + * matching a runtime-evaluated secret. + * + * - `getSecret` is invoked per request (do not capture the value at module + * load time — env vars may not be populated yet). + * - Returns `503` when the secret is unset (matches the existing + * `serviceTokenAuth` behavior). + * - Returns `401` on header mismatch. + */ +export const bearerSecretAuth = + (getSecret: () => string | undefined): MiddlewareHandler => + async (c, next) => { + const secret = getSecret(); + if (!secret) { + log('Secret is not configured for %s', c.req.path); + return c.json({ error: 'Service not configured' }, 503); + } + + const authHeader = c.req.header('authorization'); + if (authHeader !== `Bearer ${secret}`) { + return c.json({ error: 'Unauthorized' }, 401); + } + + await next(); + }; diff --git a/src/server/agent-hono/middlewares/qstashAuth.ts b/src/server/agent-hono/middlewares/qstashAuth.ts new file mode 100644 index 0000000000..96aabcc659 --- /dev/null +++ b/src/server/agent-hono/middlewares/qstashAuth.ts @@ -0,0 +1,25 @@ +import debug from 'debug'; +import type { MiddlewareHandler } from 'hono'; + +import { verifyQStashSignature } from '@/libs/qstash'; + +const log = debug('lobe-server:agent:qstash-auth'); + +/** + * Hono middleware that requires a valid QStash signature on the request. + * + * The body is consumed via `c.req.text()` to compute the QStash HMAC; + * downstream handlers can still call `c.req.json()` thanks to Hono's + * bodyCache cross-conversion. + */ +export const qstashAuth = (): MiddlewareHandler => async (c, next) => { + const rawBody = await c.req.text(); + const isValid = await verifyQStashSignature(c.req.raw, rawBody); + + if (!isValid) { + log('Rejected: invalid QStash signature on %s', c.req.path); + return c.json({ error: 'Invalid signature' }, 401); + } + + await next(); +};