mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-13 19:20:04 +00:00
♻️ refactor(agent): migrate remaining /api/agent routes to Hono (#14478)
* ♻️ refactor(agent): migrate remaining /api/agent routes to Hono Move the static `route.ts` handlers under `src/app/(backend)/api/agent/` into the existing Hono app at `src/server/agent-hono/`, leaving only the SSE `stream` endpoint as a Next.js route. Behavior, URLs, and auth semantics are unchanged. - New middlewares: `qstashAuth` (QStash sig only) and `bearerSecretAuth` (factory for arbitrary `Bearer <secret>` checks) - Migrated handlers: `run`, `webhooks/bot-callback`, `gateway`, `gateway/start`, `gateway/callback`, `webhooks/[platform]/[[...appId]]` - `gateway/callback` keeps inline auth so the disabled-feature 204 still short-circuits before any auth check - `gatewayCron` keeps `next/server`'s `after()` for the 10-min poll loop Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * 🧪 test(agent-hono): cover migrated route handlers and new middlewares Add unit tests for the handlers and middlewares introduced by the /api/agent → Hono migration. Each test uses the same hand-built Hono Context stub pattern as `toolResult.test.ts` (vitest can't resolve the hoisted `hono` package, so a real Hono Context isn't available in tests). Coverage: - middlewares/qstashAuth (sig pass/fail → next called/not, body forwarded to verifier) - middlewares/bearerSecretAuth (503/401/200 paths, lazy secret eval) - handlers/runStep (validation, lock 429 + Retry-After, success shape, upstash-retried header forwarding) - handlers/botCallback (validation + service delegation + 500 on throw) - handlers/gatewayCallback (disabled-feature 204, auth, zod validation, state.status → BotRuntimeStatus mapping) - handlers/gatewayStart (start/restart paths, stop-before-ensure ordering, 500 on failure) - handlers/platformWebhook (param validation, raw request passthrough) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -76,7 +76,7 @@ The router caches loaded bots in memory. Cache is **invalidated** by `BotMessage
|
||||
2. Calls `execAgent` with `stepWebhook` and `completionWebhook` pointing at `${INTERNAL_APP_URL ?? APP_URL}/api/agent/webhooks/bot-callback`, plus `webhookDelivery: 'qstash'`.
|
||||
3. Returns immediately; the bridge `finally` block keeps the active-thread marker held until the `completion` callback fires.
|
||||
|
||||
`/api/agent/webhooks/bot-callback/route.ts` verifies the QStash signature and hands off to `BotCallbackService.handleCallback`:
|
||||
`POST /api/agent/webhooks/bot-callback` (`src/server/agent-hono/handlers/botCallback.ts`) verifies the QStash signature via the `qstashAuth` middleware and hands off to `BotCallbackService.handleCallback`:
|
||||
|
||||
- `type: 'step'` → `handleStep` re-renders `renderStepProgress`, edits `progressMessageId` (skipped if `displayToolCalls=false` or platform `supportsMessageEdit=false`).
|
||||
- `type: 'completion'` → `handleCompletion` writes the final reply (or error/interrupted message), removes the 👀 reaction, clears active-thread tracker, fires async `summarizeTopicTitle`.
|
||||
@@ -140,12 +140,12 @@ Webhook platforms run fine in serverless functions. Persistent platforms (`webso
|
||||
- On Vercel + webhook mode → start the client inline (one HTTP call).
|
||||
- Off-Vercel → `GatewayManager` singleton holds long-lived clients in process.
|
||||
|
||||
**`GET /api/agent/gateway/route.ts`** (cron, `Bearer ${CRON_SECRET}`):
|
||||
**`GET /api/agent/gateway`** (`src/server/agent-hono/handlers/gatewayCron.ts`, cron, `Bearer ${CRON_SECRET}`):
|
||||
|
||||
- Iterates registered platforms and starts every enabled persistent provider with `durationMs = 10min`, then in `after(...)` polls `BotConnectQueue` every 30s for new connect requests, until the window expires.
|
||||
- `getEffectiveConnectionMode(platform, settings)` is the only place that resolves per-provider mode — respect it everywhere.
|
||||
|
||||
**`POST /api/agent/gateway/start/route.ts`** is the non-Vercel `ensureRunning` entry point (`Bearer ${KEY_VAULTS_SECRET}`).
|
||||
**`POST /api/agent/gateway/start`** (`src/server/agent-hono/handlers/gatewayStart.ts`) is the non-Vercel `ensureRunning` entry point (`Bearer ${KEY_VAULTS_SECRET}`).
|
||||
|
||||
**Runtime status** is stored in Redis at `bot:runtime-status:platform:appId` with TTL ≈ `durationMs + 60s`. States: `starting | connected | disconnected | failed | queued`. Updated by each `PlatformClient.start/stop` and by the gateway service.
|
||||
|
||||
@@ -226,11 +226,11 @@ Client service: `src/services/agentBotProvider.ts`. Store actions: `src/store/ag
|
||||
## Key Files
|
||||
|
||||
```plaintext
|
||||
Webhook routes:
|
||||
src/app/(backend)/api/agent/webhooks/[platform]/[[...appId]]/route.ts — inbound catch-all
|
||||
src/app/(backend)/api/agent/webhooks/bot-callback/route.ts — qstash bot callback
|
||||
src/app/(backend)/api/agent/gateway/route.ts — cron gateway (10min window)
|
||||
src/app/(backend)/api/agent/gateway/start/route.ts — non-Vercel ensureRunning
|
||||
Webhook routes (mounted via `src/app/(backend)/api/agent/[[...route]]/route.ts` → `src/server/agent-hono`):
|
||||
src/server/agent-hono/handlers/platformWebhook.ts — inbound catch-all (POST /webhooks/:platform/:appId?)
|
||||
src/server/agent-hono/handlers/botCallback.ts — qstash bot callback
|
||||
src/server/agent-hono/handlers/gatewayCron.ts — cron gateway (10min window)
|
||||
src/server/agent-hono/handlers/gatewayStart.ts — non-Vercel ensureRunning
|
||||
|
||||
Bot service:
|
||||
src/server/services/bot/index.ts — barrel
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
import { NextResponse } from 'next/server';
|
||||
|
||||
import { getServerDBConfig } from '@/config/db';
|
||||
import { GatewayService } from '@/server/services/gateway';
|
||||
|
||||
export const POST = async (req: Request): Promise<Response> => {
|
||||
const { KEY_VAULTS_SECRET } = getServerDBConfig();
|
||||
|
||||
const authHeader = req.headers.get('authorization');
|
||||
if (authHeader !== `Bearer ${KEY_VAULTS_SECRET}`) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
|
||||
}
|
||||
|
||||
const body = await req.json().catch(() => ({}));
|
||||
const service = new GatewayService();
|
||||
|
||||
try {
|
||||
if (body.restart) {
|
||||
console.info('[GatewayService] Restarting...');
|
||||
await service.stop();
|
||||
}
|
||||
|
||||
await service.ensureRunning();
|
||||
console.info('[GatewayService] Started successfully');
|
||||
|
||||
return NextResponse.json({ status: body.restart ? 'restarted' : 'started' });
|
||||
} catch (error) {
|
||||
console.error('[GatewayService] Failed to start:', error);
|
||||
return NextResponse.json({ error: 'Failed to start gateway' }, { status: 500 });
|
||||
}
|
||||
};
|
||||
@@ -1,30 +0,0 @@
|
||||
import debug from 'debug';
|
||||
|
||||
import { getBotMessageRouter } from '@/server/services/bot';
|
||||
|
||||
const log = debug('lobe-server:bot:webhook-route');
|
||||
|
||||
/**
|
||||
* Unified webhook endpoint for Chat SDK bot platforms.
|
||||
*
|
||||
* Handles both generic and bot-specific webhook URLs:
|
||||
* - POST /api/agent/webhooks/[platform]
|
||||
* - POST /api/agent/webhooks/[platform]/[appId]
|
||||
*
|
||||
* Using an optional catch-all `[[...appId]]` ensures both patterns are served
|
||||
* by a single serverless function, avoiding deployment issues with nested
|
||||
* dynamic segments on Vercel.
|
||||
*/
|
||||
export const POST = async (
|
||||
req: Request,
|
||||
{ params }: { params: Promise<{ appId?: string[]; platform: string }> },
|
||||
): Promise<Response> => {
|
||||
const { platform, appId: appIdSegments } = await params;
|
||||
const appId = appIdSegments?.[0];
|
||||
|
||||
log('Received webhook: platform=%s, appId=%s, url=%s', platform, appId ?? '(none)', req.url);
|
||||
|
||||
const router = getBotMessageRouter();
|
||||
const handler = router.getWebhookHandler(platform, appId);
|
||||
return handler(req);
|
||||
};
|
||||
@@ -1,64 +0,0 @@
|
||||
import debug from 'debug';
|
||||
import { NextResponse } from 'next/server';
|
||||
|
||||
import { getServerDB } from '@/database/core/db-adaptor';
|
||||
import { verifyQStashSignature } from '@/libs/qstash';
|
||||
import { BotCallbackService } from '@/server/services/bot/BotCallbackService';
|
||||
|
||||
const log = debug('api-route:agent:bot-callback');
|
||||
|
||||
/**
|
||||
* Bot callback endpoint for agent step/completion webhooks.
|
||||
*
|
||||
* In queue mode, AgentRuntimeService fires webhooks (via QStash) after each step
|
||||
* and on completion. This endpoint verifies the signature and delegates to BotCallbackService.
|
||||
*
|
||||
* Route: POST /api/agent/webhooks/bot-callback
|
||||
*/
|
||||
export async function POST(request: Request): Promise<Response> {
|
||||
const rawBody = await request.text();
|
||||
|
||||
const isValid = await verifyQStashSignature(request, rawBody);
|
||||
if (!isValid) {
|
||||
return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
|
||||
}
|
||||
|
||||
const body = JSON.parse(rawBody);
|
||||
|
||||
const { type, applicationId, platformThreadId, progressMessageId } = body;
|
||||
|
||||
log(
|
||||
'bot-callback: type=%s, applicationId=%s, platformThreadId=%s, progressMessageId=%s',
|
||||
type,
|
||||
applicationId,
|
||||
platformThreadId,
|
||||
progressMessageId,
|
||||
);
|
||||
|
||||
if (!type || !applicationId || !platformThreadId) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Missing required fields: type, applicationId, platformThreadId',
|
||||
},
|
||||
{ status: 400 },
|
||||
);
|
||||
}
|
||||
|
||||
if (type !== 'step' && type !== 'completion') {
|
||||
return NextResponse.json({ error: `Unknown callback type: ${type}` }, { status: 400 });
|
||||
}
|
||||
|
||||
try {
|
||||
const serverDB = await getServerDB();
|
||||
const service = new BotCallbackService(serverDB);
|
||||
await service.handleCallback(body);
|
||||
|
||||
return NextResponse.json({ success: true });
|
||||
} catch (error) {
|
||||
console.error('bot-callback error:', error);
|
||||
return NextResponse.json(
|
||||
{ error: error instanceof Error ? error.message : 'Internal error' },
|
||||
{ status: 500 },
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
// @vitest-environment node
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { botCallback } from '../botCallback';
|
||||
|
||||
const mockHandleCallback = vi.fn();
|
||||
|
||||
vi.mock('@/server/services/bot/BotCallbackService', () => ({
|
||||
BotCallbackService: vi.fn().mockImplementation(() => ({
|
||||
handleCallback: mockHandleCallback,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock('@/database/core/db-adaptor', () => ({
|
||||
getServerDB: vi.fn().mockResolvedValue({} as any),
|
||||
}));
|
||||
|
||||
function buildContext(opts: { body?: unknown; jsonThrows?: boolean }) {
|
||||
const captures: Array<{ body: any; status: number }> = [];
|
||||
const ctx = {
|
||||
json: (b: any, status = 200) => {
|
||||
captures.push({ body: b, status });
|
||||
return Response.json(b, { status });
|
||||
},
|
||||
req: {
|
||||
json: opts.jsonThrows
|
||||
? async () => {
|
||||
throw new Error('bad json');
|
||||
}
|
||||
: async () => opts.body,
|
||||
},
|
||||
} as any;
|
||||
return { ctx, getCaptures: () => captures };
|
||||
}
|
||||
|
||||
const validStepBody = {
|
||||
applicationId: 'app-1',
|
||||
platformThreadId: 'thread-1',
|
||||
progressMessageId: 'msg-1',
|
||||
type: 'step',
|
||||
};
|
||||
|
||||
describe('botCallback handler', () => {
|
||||
beforeEach(() => {
|
||||
mockHandleCallback.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('returns 400 when JSON parsing throws', async () => {
|
||||
const { ctx } = buildContext({ jsonThrows: true });
|
||||
const res = await botCallback(ctx);
|
||||
expect(res.status).toBe(400);
|
||||
expect(mockHandleCallback).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it.each([
|
||||
['type', { ...validStepBody, type: undefined }],
|
||||
['applicationId', { ...validStepBody, applicationId: undefined }],
|
||||
['platformThreadId', { ...validStepBody, platformThreadId: undefined }],
|
||||
])('returns 400 when required field %s is missing', async (_field, body) => {
|
||||
const { ctx, getCaptures } = buildContext({ body });
|
||||
const res = await botCallback(ctx);
|
||||
expect(res.status).toBe(400);
|
||||
expect(getCaptures()[0].body.error).toMatch(/Missing required fields/);
|
||||
expect(mockHandleCallback).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns 400 for unknown callback types', async () => {
|
||||
const { ctx, getCaptures } = buildContext({
|
||||
body: { ...validStepBody, type: 'unknown' },
|
||||
});
|
||||
const res = await botCallback(ctx);
|
||||
expect(res.status).toBe(400);
|
||||
expect(getCaptures()[0].body.error).toBe('Unknown callback type: unknown');
|
||||
});
|
||||
|
||||
it('delegates to BotCallbackService and returns 200 on happy path', async () => {
|
||||
mockHandleCallback.mockResolvedValue(undefined);
|
||||
const { ctx, getCaptures } = buildContext({ body: validStepBody });
|
||||
|
||||
const res = await botCallback(ctx);
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(getCaptures()[0].body).toEqual({ success: true });
|
||||
expect(mockHandleCallback).toHaveBeenCalledWith(validStepBody);
|
||||
});
|
||||
|
||||
it('accepts type=completion', async () => {
|
||||
mockHandleCallback.mockResolvedValue(undefined);
|
||||
const body = { ...validStepBody, type: 'completion' };
|
||||
const { ctx } = buildContext({ body });
|
||||
|
||||
const res = await botCallback(ctx);
|
||||
expect(res.status).toBe(200);
|
||||
expect(mockHandleCallback).toHaveBeenCalledWith(body);
|
||||
});
|
||||
|
||||
it('returns 500 with the error message when the service throws', async () => {
|
||||
mockHandleCallback.mockRejectedValue(new Error('service down'));
|
||||
const { ctx, getCaptures } = buildContext({ body: validStepBody });
|
||||
|
||||
const res = await botCallback(ctx);
|
||||
|
||||
expect(res.status).toBe(500);
|
||||
expect(getCaptures()[0].body).toEqual({ error: 'service down' });
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,160 @@
|
||||
// @vitest-environment node
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { gatewayCallback } from '../gatewayCallback';
|
||||
|
||||
const { gatewayEnvState, mockUpdateBotRuntimeStatus } = vi.hoisted(() => ({
|
||||
gatewayEnvState: {} as {
|
||||
MESSAGE_GATEWAY_ENABLED?: string;
|
||||
MESSAGE_GATEWAY_SERVICE_TOKEN?: string;
|
||||
},
|
||||
mockUpdateBotRuntimeStatus: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('@/envs/gateway', () => ({
|
||||
gatewayEnv: new Proxy(gatewayEnvState, {
|
||||
get: (target, prop: string) => target[prop as keyof typeof target],
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock('@/server/services/gateway/runtimeStatus', () => ({
|
||||
BOT_RUNTIME_STATUSES: {
|
||||
connected: 'connected',
|
||||
disconnected: 'disconnected',
|
||||
dormant: 'dormant',
|
||||
failed: 'failed',
|
||||
queued: 'queued',
|
||||
starting: 'starting',
|
||||
},
|
||||
updateBotRuntimeStatus: mockUpdateBotRuntimeStatus,
|
||||
}));
|
||||
|
||||
function buildContext(opts: { authHeader?: string; body?: unknown; jsonThrows?: boolean }) {
|
||||
const captures: Array<{ body: any; status: number }> = [];
|
||||
const ctx = {
|
||||
body: (b: any, status: number) => {
|
||||
captures.push({ body: b, status });
|
||||
return new Response(b, { status });
|
||||
},
|
||||
json: (b: any, status = 200) => {
|
||||
captures.push({ body: b, status });
|
||||
return Response.json(b, { status });
|
||||
},
|
||||
req: {
|
||||
header: (name: string) =>
|
||||
name.toLowerCase() === 'authorization' ? opts.authHeader : undefined,
|
||||
json: opts.jsonThrows
|
||||
? async () => {
|
||||
throw new Error('bad json');
|
||||
}
|
||||
: async () => opts.body,
|
||||
},
|
||||
} as any;
|
||||
return { ctx, getCaptures: () => captures };
|
||||
}
|
||||
|
||||
const validBody = {
|
||||
applicationId: 'app-1',
|
||||
connectionId: 'conn-1',
|
||||
platform: 'discord',
|
||||
state: { status: 'connected' },
|
||||
};
|
||||
|
||||
describe('gatewayCallback handler', () => {
|
||||
beforeEach(() => {
|
||||
mockUpdateBotRuntimeStatus.mockReset();
|
||||
gatewayEnvState.MESSAGE_GATEWAY_ENABLED = '1';
|
||||
gatewayEnvState.MESSAGE_GATEWAY_SERVICE_TOKEN = 'token-xyz';
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('returns 204 when MESSAGE_GATEWAY_ENABLED is not "1"', async () => {
|
||||
gatewayEnvState.MESSAGE_GATEWAY_ENABLED = '0';
|
||||
|
||||
const { ctx, getCaptures } = buildContext({ authHeader: 'Bearer wrong', body: validBody });
|
||||
const res = await gatewayCallback(ctx);
|
||||
|
||||
expect(res.status).toBe(204);
|
||||
expect(getCaptures()[0]).toEqual({ body: null, status: 204 });
|
||||
expect(mockUpdateBotRuntimeStatus).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns 503 when service token is unset', async () => {
|
||||
gatewayEnvState.MESSAGE_GATEWAY_SERVICE_TOKEN = undefined;
|
||||
const { ctx } = buildContext({ authHeader: 'Bearer x', body: validBody });
|
||||
const res = await gatewayCallback(ctx);
|
||||
expect(res.status).toBe(503);
|
||||
});
|
||||
|
||||
it('returns 401 on bearer mismatch', async () => {
|
||||
const { ctx } = buildContext({ authHeader: 'Bearer wrong', body: validBody });
|
||||
const res = await gatewayCallback(ctx);
|
||||
expect(res.status).toBe(401);
|
||||
});
|
||||
|
||||
it('returns 400 on JSON parse failure', async () => {
|
||||
const { ctx } = buildContext({ authHeader: 'Bearer token-xyz', jsonThrows: true });
|
||||
const res = await gatewayCallback(ctx);
|
||||
expect(res.status).toBe(400);
|
||||
});
|
||||
|
||||
it('returns 400 on zod validation failure (missing connectionId)', async () => {
|
||||
const { ctx, getCaptures } = buildContext({
|
||||
authHeader: 'Bearer token-xyz',
|
||||
body: { ...validBody, connectionId: undefined },
|
||||
});
|
||||
const res = await gatewayCallback(ctx);
|
||||
expect(res.status).toBe(400);
|
||||
expect(getCaptures()[0].body.error).toBe('Invalid body');
|
||||
});
|
||||
|
||||
it('returns 204 silently when applicationId is missing (no status update)', async () => {
|
||||
const { ctx } = buildContext({
|
||||
authHeader: 'Bearer token-xyz',
|
||||
body: { ...validBody, applicationId: undefined },
|
||||
});
|
||||
const res = await gatewayCallback(ctx);
|
||||
expect(res.status).toBe(204);
|
||||
expect(mockUpdateBotRuntimeStatus).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns 204 without writing status when state is "connecting"', async () => {
|
||||
const { ctx } = buildContext({
|
||||
authHeader: 'Bearer token-xyz',
|
||||
body: { ...validBody, state: { status: 'connecting' } },
|
||||
});
|
||||
const res = await gatewayCallback(ctx);
|
||||
expect(res.status).toBe(204);
|
||||
expect(mockUpdateBotRuntimeStatus).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it.each([
|
||||
['connected', 'connected'],
|
||||
['disconnected', 'disconnected'],
|
||||
['dormant', 'dormant'],
|
||||
['error', 'failed'],
|
||||
])('maps state.status=%s to runtimeStatus=%s and writes once', async (incoming, expected) => {
|
||||
mockUpdateBotRuntimeStatus.mockResolvedValue(undefined);
|
||||
const { ctx } = buildContext({
|
||||
authHeader: 'Bearer token-xyz',
|
||||
body: {
|
||||
...validBody,
|
||||
state: { error: 'oops', status: incoming },
|
||||
},
|
||||
});
|
||||
|
||||
const res = await gatewayCallback(ctx);
|
||||
|
||||
expect(res.status).toBe(204);
|
||||
expect(mockUpdateBotRuntimeStatus).toHaveBeenCalledTimes(1);
|
||||
expect(mockUpdateBotRuntimeStatus).toHaveBeenCalledWith({
|
||||
applicationId: 'app-1',
|
||||
errorMessage: 'oops',
|
||||
platform: 'discord',
|
||||
status: expected,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,88 @@
|
||||
// @vitest-environment node
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { gatewayStart } from '../gatewayStart';
|
||||
|
||||
const { mockEnsureRunning, mockStop } = vi.hoisted(() => ({
|
||||
mockEnsureRunning: vi.fn(),
|
||||
mockStop: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('@/server/services/gateway', () => ({
|
||||
GatewayService: vi.fn().mockImplementation(() => ({
|
||||
ensureRunning: mockEnsureRunning,
|
||||
stop: mockStop,
|
||||
})),
|
||||
}));
|
||||
|
||||
function buildContext(opts: { body?: unknown; jsonThrows?: boolean }) {
|
||||
const captures: Array<{ body: any; status: number }> = [];
|
||||
const ctx = {
|
||||
json: (b: any, status = 200) => {
|
||||
captures.push({ body: b, status });
|
||||
return Response.json(b, { status });
|
||||
},
|
||||
req: {
|
||||
json: opts.jsonThrows
|
||||
? async () => {
|
||||
throw new Error('bad json');
|
||||
}
|
||||
: async () => opts.body,
|
||||
},
|
||||
} as any;
|
||||
return { ctx, getCaptures: () => captures };
|
||||
}
|
||||
|
||||
describe('gatewayStart handler', () => {
|
||||
beforeEach(() => {
|
||||
mockEnsureRunning.mockReset().mockResolvedValue(undefined);
|
||||
mockStop.mockReset().mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('returns status="started" and only calls ensureRunning when restart is falsy', async () => {
|
||||
const { ctx, getCaptures } = buildContext({ body: {} });
|
||||
const res = await gatewayStart(ctx);
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(getCaptures()[0].body).toEqual({ status: 'started' });
|
||||
expect(mockStop).not.toHaveBeenCalled();
|
||||
expect(mockEnsureRunning).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('treats a malformed JSON body as empty {} and starts the gateway', async () => {
|
||||
const { ctx, getCaptures } = buildContext({ jsonThrows: true });
|
||||
const res = await gatewayStart(ctx);
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(getCaptures()[0].body).toEqual({ status: 'started' });
|
||||
expect(mockEnsureRunning).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('stops then ensures running when restart=true and reports "restarted"', async () => {
|
||||
const { ctx, getCaptures } = buildContext({ body: { restart: true } });
|
||||
const res = await gatewayStart(ctx);
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(getCaptures()[0].body).toEqual({ status: 'restarted' });
|
||||
expect(mockStop).toHaveBeenCalledTimes(1);
|
||||
expect(mockEnsureRunning).toHaveBeenCalledTimes(1);
|
||||
// stop must be called before ensureRunning
|
||||
expect(mockStop.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
mockEnsureRunning.mock.invocationCallOrder[0],
|
||||
);
|
||||
});
|
||||
|
||||
it('returns 500 when ensureRunning throws', async () => {
|
||||
mockEnsureRunning.mockRejectedValue(new Error('boom'));
|
||||
const { ctx, getCaptures } = buildContext({ body: {} });
|
||||
|
||||
const res = await gatewayStart(ctx);
|
||||
|
||||
expect(res.status).toBe(500);
|
||||
expect(getCaptures()[0].body).toEqual({ error: 'Failed to start gateway' });
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,88 @@
|
||||
// @vitest-environment node
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { platformWebhook } from '../platformWebhook';
|
||||
|
||||
const { mockGetWebhookHandler, mockGetBotMessageRouter } = vi.hoisted(() => {
|
||||
const handler = vi.fn();
|
||||
return {
|
||||
mockGetBotMessageRouter: vi.fn(() => ({ getWebhookHandler: handler })),
|
||||
mockGetWebhookHandler: handler,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock('@/server/services/bot', () => ({
|
||||
getBotMessageRouter: mockGetBotMessageRouter,
|
||||
}));
|
||||
|
||||
function buildContext(opts: { params: Record<string, string | undefined>; url?: string }) {
|
||||
const rawRequest = new Request(opts.url ?? 'http://x/api/agent/webhooks/telegram/app-1', {
|
||||
method: 'POST',
|
||||
body: '{}',
|
||||
});
|
||||
const captures: Array<{ body: any; status: number }> = [];
|
||||
const ctx = {
|
||||
json: (b: any, status = 200) => {
|
||||
captures.push({ body: b, status });
|
||||
return Response.json(b, { status });
|
||||
},
|
||||
req: {
|
||||
param: (name: string) => opts.params[name],
|
||||
raw: rawRequest,
|
||||
url: rawRequest.url,
|
||||
},
|
||||
} as any;
|
||||
return { ctx, getCaptures: () => captures, rawRequest };
|
||||
}
|
||||
|
||||
describe('platformWebhook handler', () => {
|
||||
beforeEach(() => {
|
||||
mockGetWebhookHandler.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('returns 400 when platform param is missing', async () => {
|
||||
const { ctx, getCaptures } = buildContext({ params: { platform: undefined } });
|
||||
|
||||
const res = await platformWebhook(ctx);
|
||||
|
||||
expect(res.status).toBe(400);
|
||||
expect(getCaptures()[0].body).toEqual({ error: 'platform is required' });
|
||||
expect(mockGetWebhookHandler).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('forwards platform + appId + raw request to the bot message router', async () => {
|
||||
const platformResponse = new Response('ok', { status: 202 });
|
||||
const innerHandler = vi.fn().mockResolvedValue(platformResponse);
|
||||
mockGetWebhookHandler.mockReturnValue(innerHandler);
|
||||
|
||||
const { ctx, rawRequest } = buildContext({
|
||||
params: { appId: 'app-1', platform: 'telegram' },
|
||||
});
|
||||
|
||||
const res = await platformWebhook(ctx);
|
||||
|
||||
expect(res).toBe(platformResponse);
|
||||
expect(mockGetWebhookHandler).toHaveBeenCalledWith('telegram', 'app-1');
|
||||
// Platform-side signature verification depends on the original Request,
|
||||
// so the handler must receive `c.req.raw` verbatim.
|
||||
expect(innerHandler).toHaveBeenCalledTimes(1);
|
||||
expect(innerHandler.mock.calls[0][0]).toBe(rawRequest);
|
||||
});
|
||||
|
||||
it('passes appId=undefined when only platform is provided', async () => {
|
||||
mockGetWebhookHandler.mockImplementation(() => async () => new Response(null, { status: 204 }));
|
||||
|
||||
const { ctx } = buildContext({
|
||||
params: { platform: 'discord' },
|
||||
url: 'http://x/api/agent/webhooks/discord',
|
||||
});
|
||||
|
||||
await platformWebhook(ctx);
|
||||
|
||||
expect(mockGetWebhookHandler).toHaveBeenCalledWith('discord', undefined);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,194 @@
|
||||
// @vitest-environment node
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { runStep, runStepHealth } from '../runStep';
|
||||
|
||||
const mockGetOperationMetadata = vi.fn();
|
||||
const mockExecuteStep = vi.fn();
|
||||
|
||||
vi.mock('@/server/modules/AgentRuntime', () => ({
|
||||
AgentRuntimeCoordinator: vi.fn().mockImplementation(() => ({
|
||||
getOperationMetadata: mockGetOperationMetadata,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock('@/server/services/agentRuntime', () => ({
|
||||
AgentRuntimeService: vi.fn().mockImplementation(() => ({
|
||||
executeStep: mockExecuteStep,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock('@/database/core/db-adaptor', () => ({
|
||||
getServerDB: vi.fn().mockResolvedValue({} as any),
|
||||
}));
|
||||
|
||||
function buildContext(opts: { body?: unknown; jsonThrows?: boolean; retried?: string }) {
|
||||
const captures: Array<{ body: any; status: number; headers?: Record<string, string> }> = [];
|
||||
const ctx = {
|
||||
json: (b: any, status = 200, headers?: Record<string, string>) => {
|
||||
captures.push({ body: b, status, headers });
|
||||
return Response.json(b, { status, headers });
|
||||
},
|
||||
req: {
|
||||
header: (name: string) =>
|
||||
name.toLowerCase() === 'upstash-retried' ? opts.retried : undefined,
|
||||
json: opts.jsonThrows
|
||||
? async () => {
|
||||
throw new Error('bad json');
|
||||
}
|
||||
: async () => opts.body,
|
||||
},
|
||||
} as any;
|
||||
return { ctx, getCaptures: () => captures };
|
||||
}
|
||||
|
||||
const validBody = {
|
||||
context: { foo: 'bar' },
|
||||
operationId: 'op-1',
|
||||
stepIndex: 2,
|
||||
};
|
||||
|
||||
describe('runStep handler', () => {
|
||||
beforeEach(() => {
|
||||
mockGetOperationMetadata.mockReset();
|
||||
mockExecuteStep.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('returns 400 when JSON parsing throws', async () => {
|
||||
const { ctx, getCaptures } = buildContext({ jsonThrows: true });
|
||||
const res = await runStep(ctx);
|
||||
expect(res.status).toBe(400);
|
||||
expect(getCaptures()[0].body).toEqual({ error: 'Invalid JSON body' });
|
||||
expect(mockGetOperationMetadata).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns 400 when operationId is missing', async () => {
|
||||
const { ctx } = buildContext({ body: { stepIndex: 0 } });
|
||||
const res = await runStep(ctx);
|
||||
expect(res.status).toBe(400);
|
||||
expect(mockGetOperationMetadata).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns 401 when operation metadata has no userId', async () => {
|
||||
mockGetOperationMetadata.mockResolvedValue(null);
|
||||
const { ctx, getCaptures } = buildContext({ body: validBody });
|
||||
|
||||
const res = await runStep(ctx);
|
||||
|
||||
expect(res.status).toBe(401);
|
||||
expect(getCaptures()[0].body).toEqual({ error: 'Invalid operation or unauthorized' });
|
||||
expect(mockExecuteStep).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns 429 with Retry-After header when the step is locked', async () => {
|
||||
mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1' });
|
||||
mockExecuteStep.mockResolvedValue({
|
||||
locked: true,
|
||||
nextStepScheduled: false,
|
||||
state: {},
|
||||
success: false,
|
||||
});
|
||||
|
||||
const { ctx, getCaptures } = buildContext({ body: validBody });
|
||||
const res = await runStep(ctx);
|
||||
|
||||
expect(res.status).toBe(429);
|
||||
const captured = getCaptures()[0];
|
||||
expect(captured.body).toMatchObject({
|
||||
error: 'Step is currently being executed, retry later',
|
||||
operationId: 'op-1',
|
||||
stepIndex: 2,
|
||||
});
|
||||
expect(captured.headers).toEqual({ 'Retry-After': '37' });
|
||||
});
|
||||
|
||||
it('forwards the upstash-retried header to executeStep as externalRetryCount', async () => {
|
||||
mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1' });
|
||||
mockExecuteStep.mockResolvedValue({
|
||||
nextStepScheduled: false,
|
||||
state: { status: 'done', cost: { total: 0 }, stepCount: 1 },
|
||||
success: true,
|
||||
});
|
||||
|
||||
const { ctx } = buildContext({ body: validBody, retried: '3' });
|
||||
await runStep(ctx);
|
||||
|
||||
expect(mockExecuteStep).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ externalRetryCount: 3, operationId: 'op-1', stepIndex: 2 }),
|
||||
);
|
||||
});
|
||||
|
||||
it('shapes the success response with status, totals and pending fields', async () => {
|
||||
mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1' });
|
||||
mockExecuteStep.mockResolvedValue({
|
||||
nextStepScheduled: true,
|
||||
state: {
|
||||
cost: { total: 0.42 },
|
||||
pendingHumanPrompt: { id: 'p1' },
|
||||
pendingHumanSelect: undefined,
|
||||
pendingToolsCalling: ['t1'],
|
||||
status: 'waiting_for_human',
|
||||
stepCount: 5,
|
||||
},
|
||||
success: true,
|
||||
});
|
||||
|
||||
const { ctx, getCaptures } = buildContext({ body: validBody });
|
||||
const res = await runStep(ctx);
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(getCaptures()[0].body).toMatchObject({
|
||||
completed: false,
|
||||
nextStepIndex: 3,
|
||||
nextStepScheduled: true,
|
||||
operationId: 'op-1',
|
||||
pendingApproval: ['t1'],
|
||||
pendingPrompt: { id: 'p1' },
|
||||
status: 'waiting_for_human',
|
||||
stepIndex: 2,
|
||||
success: true,
|
||||
totalCost: 0.42,
|
||||
totalSteps: 5,
|
||||
waitingForHuman: true,
|
||||
});
|
||||
});
|
||||
|
||||
it('returns 500 on unexpected service errors and echoes operationId', async () => {
|
||||
mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1' });
|
||||
mockExecuteStep.mockRejectedValue(new Error('boom'));
|
||||
|
||||
const { ctx, getCaptures } = buildContext({ body: validBody });
|
||||
const res = await runStep(ctx);
|
||||
|
||||
expect(res.status).toBe(500);
|
||||
expect(getCaptures()[0].body).toMatchObject({
|
||||
error: 'boom',
|
||||
operationId: 'op-1',
|
||||
stepIndex: 2,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('runStepHealth handler', () => {
|
||||
it('returns a healthy payload', () => {
|
||||
const captures: any[] = [];
|
||||
const ctx = {
|
||||
json: (b: any, status = 200) => {
|
||||
captures.push({ body: b, status });
|
||||
return Response.json(b, { status });
|
||||
},
|
||||
} as any;
|
||||
|
||||
const res = runStepHealth(ctx);
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(captures[0].body).toMatchObject({
|
||||
healthy: true,
|
||||
message: 'Agent execution service is running',
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,52 @@
|
||||
import debug from 'debug';
|
||||
import type { Context } from 'hono';
|
||||
|
||||
import { getServerDB } from '@/database/core/db-adaptor';
|
||||
import { BotCallbackService } from '@/server/services/bot/BotCallbackService';
|
||||
|
||||
const log = debug('lobe-server:agent:bot-callback');
|
||||
|
||||
/**
|
||||
* Bot callback endpoint for agent step/completion webhooks.
|
||||
*
|
||||
* In queue mode, AgentRuntimeService fires webhooks (via QStash) after each step
|
||||
* and on completion. This endpoint verifies the signature (via the `qstashAuth`
|
||||
* middleware on the route) and delegates to BotCallbackService.
|
||||
*/
|
||||
export async function botCallback(c: Context): Promise<Response> {
|
||||
let body: any;
|
||||
try {
|
||||
body = await c.req.json();
|
||||
} catch {
|
||||
return c.json({ error: 'Invalid JSON body' }, 400);
|
||||
}
|
||||
|
||||
const { type, applicationId, platformThreadId, progressMessageId } = body;
|
||||
|
||||
log(
|
||||
'bot-callback: type=%s, applicationId=%s, platformThreadId=%s, progressMessageId=%s',
|
||||
type,
|
||||
applicationId,
|
||||
platformThreadId,
|
||||
progressMessageId,
|
||||
);
|
||||
|
||||
if (!type || !applicationId || !platformThreadId) {
|
||||
return c.json({ error: 'Missing required fields: type, applicationId, platformThreadId' }, 400);
|
||||
}
|
||||
|
||||
if (type !== 'step' && type !== 'completion') {
|
||||
return c.json({ error: `Unknown callback type: ${type}` }, 400);
|
||||
}
|
||||
|
||||
try {
|
||||
const serverDB = await getServerDB();
|
||||
const service = new BotCallbackService(serverDB);
|
||||
await service.handleCallback(body);
|
||||
|
||||
return c.json({ success: true });
|
||||
} catch (error) {
|
||||
console.error('bot-callback error:', error);
|
||||
return c.json({ error: error instanceof Error ? error.message : 'Internal error' }, 500);
|
||||
}
|
||||
}
|
||||
+17
-20
@@ -1,5 +1,5 @@
|
||||
import debug from 'debug';
|
||||
import { type NextRequest, NextResponse } from 'next/server';
|
||||
import type { Context } from 'hono';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { gatewayEnv } from '@/envs/gateway';
|
||||
@@ -9,7 +9,7 @@ import {
|
||||
updateBotRuntimeStatus,
|
||||
} from '@/server/services/gateway/runtimeStatus';
|
||||
|
||||
const log = debug('api-route:agent:gateway:callback');
|
||||
const log = debug('lobe-server:agent:gateway-callback');
|
||||
|
||||
const StateChangeSchema = z.object({
|
||||
applicationId: z.string().optional(),
|
||||
@@ -23,49 +23,46 @@ const StateChangeSchema = z.object({
|
||||
|
||||
/**
|
||||
* Receive connection state-change callbacks from the external message gateway.
|
||||
* When a persistent connection (e.g. Discord WebSocket) transitions to
|
||||
* "connected" or "error" asynchronously, the gateway POSTs here so LobeHub
|
||||
* can update the bot runtime status visible to users.
|
||||
* Authenticated with `MESSAGE_GATEWAY_SERVICE_TOKEN`.
|
||||
*
|
||||
* Authenticated with MESSAGE_GATEWAY_SERVICE_TOKEN.
|
||||
* Auth is inline (not a route-level middleware) because the disabled-feature
|
||||
* 204 short-circuit must run *before* the auth check — when the gateway is
|
||||
* off we silently no-op rather than 401 stale callers.
|
||||
*/
|
||||
export async function POST(request: NextRequest) {
|
||||
export async function gatewayCallback(c: Context): Promise<Response> {
|
||||
// Ignore callbacks when gateway is disabled — connections are managed locally,
|
||||
// and stale gateway callbacks (e.g. from disconnectAll during migration) could
|
||||
// overwrite locally-managed status.
|
||||
if (gatewayEnv.MESSAGE_GATEWAY_ENABLED !== '1') {
|
||||
return new NextResponse(null, { status: 204 });
|
||||
return c.body(null, 204);
|
||||
}
|
||||
|
||||
const serviceToken = gatewayEnv.MESSAGE_GATEWAY_SERVICE_TOKEN;
|
||||
if (!serviceToken) {
|
||||
return NextResponse.json({ error: 'Service not configured' }, { status: 503 });
|
||||
return c.json({ error: 'Service not configured' }, 503);
|
||||
}
|
||||
|
||||
const authHeader = request.headers.get('authorization');
|
||||
const authHeader = c.req.header('authorization');
|
||||
if (authHeader !== `Bearer ${serviceToken}`) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
|
||||
return c.json({ error: 'Unauthorized' }, 401);
|
||||
}
|
||||
|
||||
let parsed;
|
||||
try {
|
||||
const body = await request.json();
|
||||
const body = await c.req.json();
|
||||
parsed = StateChangeSchema.safeParse(body);
|
||||
} catch {
|
||||
return NextResponse.json({ error: 'Invalid JSON' }, { status: 400 });
|
||||
return c.json({ error: 'Invalid JSON' }, 400);
|
||||
}
|
||||
|
||||
if (!parsed.success) {
|
||||
return NextResponse.json(
|
||||
{ error: 'Invalid body', issues: parsed.error.issues },
|
||||
{ status: 400 },
|
||||
);
|
||||
return c.json({ error: 'Invalid body', issues: parsed.error.issues }, 400);
|
||||
}
|
||||
|
||||
const { applicationId, platform, state } = parsed.data;
|
||||
|
||||
if (!applicationId) {
|
||||
return new NextResponse(null, { status: 204 });
|
||||
return c.body(null, 204);
|
||||
}
|
||||
|
||||
const statusMap: Partial<Record<string, BotRuntimeStatus>> = {
|
||||
@@ -78,7 +75,7 @@ export async function POST(request: NextRequest) {
|
||||
const runtimeStatus = statusMap[state.status];
|
||||
if (!runtimeStatus) {
|
||||
// "connecting" — no status update needed
|
||||
return new NextResponse(null, { status: 204 });
|
||||
return c.body(null, 204);
|
||||
}
|
||||
|
||||
await updateBotRuntimeStatus({
|
||||
@@ -90,5 +87,5 @@ export async function POST(request: NextRequest) {
|
||||
|
||||
log('Updated %s:%s → %s', platform, applicationId, runtimeStatus);
|
||||
|
||||
return new NextResponse(null, { status: 204 });
|
||||
return c.body(null, 204);
|
||||
}
|
||||
+11
-9
@@ -1,5 +1,5 @@
|
||||
import debug from 'debug';
|
||||
import type { NextRequest } from 'next/server';
|
||||
import type { Context } from 'hono';
|
||||
import { after } from 'next/server';
|
||||
|
||||
import { getServerDB } from '@/database/core/db-adaptor';
|
||||
@@ -125,12 +125,14 @@ async function processConnectQueue(remainingMs: number): Promise<number> {
|
||||
return processed;
|
||||
}
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
const authHeader = request.headers.get('authorization');
|
||||
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
|
||||
return new Response('Unauthorized', { status: 401 });
|
||||
}
|
||||
|
||||
/**
|
||||
* Cron-driven gateway entry point. Runs once per Vercel cron tick and keeps
|
||||
* persistent bot connections alive for a 10-minute window via `next/server`'s
|
||||
* `after()`.
|
||||
*
|
||||
* Auth: `bearerSecretAuth(CRON_SECRET)` on the route.
|
||||
*/
|
||||
export async function gatewayCron(c: Context): Promise<Response> {
|
||||
// When the external message gateway is enabled, sync connections via gateway.
|
||||
if (process.env.MESSAGE_GATEWAY_URL && process.env.MESSAGE_GATEWAY_SERVICE_TOKEN) {
|
||||
const { GatewayService } = await import('@/server/services/gateway');
|
||||
@@ -138,7 +140,7 @@ export async function GET(request: NextRequest) {
|
||||
|
||||
if (service.useMessageGateway) {
|
||||
await service.ensureRunning();
|
||||
return Response.json({ ensureRunning: true });
|
||||
return c.json({ ensureRunning: true });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,5 +217,5 @@ export async function GET(request: NextRequest) {
|
||||
}
|
||||
});
|
||||
|
||||
return Response.json({ platforms: stats, queued, started, total });
|
||||
return c.json({ platforms: stats, queued, started, total });
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
import type { Context } from 'hono';
|
||||
|
||||
import { GatewayService } from '@/server/services/gateway';
|
||||
|
||||
/**
|
||||
* Non-Vercel `ensureRunning` entry point — used by the standalone server
|
||||
* launcher (`scripts/serverLauncher/startServer.js`). Body: `{ restart?: boolean }`.
|
||||
*
|
||||
* Auth: `bearerSecretAuth(KEY_VAULTS_SECRET)` on the route.
|
||||
*/
|
||||
export async function gatewayStart(c: Context): Promise<Response> {
|
||||
const body = await c.req.json().catch(() => ({}) as Record<string, unknown>);
|
||||
const service = new GatewayService();
|
||||
|
||||
try {
|
||||
if ((body as { restart?: boolean }).restart) {
|
||||
console.info('[GatewayService] Restarting...');
|
||||
await service.stop();
|
||||
}
|
||||
|
||||
await service.ensureRunning();
|
||||
console.info('[GatewayService] Started successfully');
|
||||
|
||||
return c.json({ status: (body as { restart?: boolean }).restart ? 'restarted' : 'started' });
|
||||
} catch (error) {
|
||||
console.error('[GatewayService] Failed to start:', error);
|
||||
return c.json({ error: 'Failed to start gateway' }, 500);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
import debug from 'debug';
|
||||
import type { Context } from 'hono';
|
||||
|
||||
import { getBotMessageRouter } from '@/server/services/bot';
|
||||
|
||||
const log = debug('lobe-server:bot:webhook-route');
|
||||
|
||||
/**
|
||||
* Unified webhook endpoint for Chat SDK bot platforms. Handles both:
|
||||
* - POST /api/agent/webhooks/:platform
|
||||
* - POST /api/agent/webhooks/:platform/:appId
|
||||
*
|
||||
* Hono receives the raw `Request` via `c.req.raw` and forwards it to the
|
||||
* platform-specific handler returned by the bot message router (the platform
|
||||
* is responsible for verifying its own signature).
|
||||
*/
|
||||
export async function platformWebhook(c: Context): Promise<Response> {
|
||||
const platform = c.req.param('platform');
|
||||
const appId = c.req.param('appId');
|
||||
|
||||
if (!platform) {
|
||||
return c.json({ error: 'platform is required' }, 400);
|
||||
}
|
||||
|
||||
log('Received webhook: platform=%s, appId=%s, url=%s', platform, appId ?? '(none)', c.req.url);
|
||||
|
||||
const router = getBotMessageRouter();
|
||||
const handler = router.getWebhookHandler(platform, appId);
|
||||
return handler(c.req.raw);
|
||||
}
|
||||
+31
-47
@@ -1,29 +1,30 @@
|
||||
import debug from 'debug';
|
||||
import { type NextRequest } from 'next/server';
|
||||
import { NextResponse } from 'next/server';
|
||||
import type { Context } from 'hono';
|
||||
|
||||
import { getServerDB } from '@/database/core/db-adaptor';
|
||||
import { verifyQStashSignature } from '@/libs/qstash';
|
||||
import { AgentRuntimeCoordinator } from '@/server/modules/AgentRuntime';
|
||||
import { AgentRuntimeService } from '@/server/services/agentRuntime';
|
||||
|
||||
const log = debug('api-route:agent:execute-step');
|
||||
const log = debug('lobe-server:agent:run-step');
|
||||
|
||||
export async function POST(request: NextRequest) {
|
||||
/**
|
||||
* Execute a single agent step. Invoked by QStash with the body
|
||||
* `{ operationId, stepIndex, context, humanInput?, approvedToolCall?, ... }`.
|
||||
*
|
||||
* Auth: `qstashAuth` on the route — QStash signature required.
|
||||
*/
|
||||
export async function runStep(c: Context): Promise<Response> {
|
||||
const startTime = Date.now();
|
||||
|
||||
// Read raw body for signature verification (must be done before parsing JSON)
|
||||
const rawBody = await request.text();
|
||||
|
||||
// Verify QStash signature
|
||||
const isValidSignature = await verifyQStashSignature(request, rawBody);
|
||||
if (!isValidSignature) {
|
||||
return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
|
||||
let body: any;
|
||||
try {
|
||||
body = await c.req.json();
|
||||
} catch {
|
||||
return c.json({ error: 'Invalid JSON body' }, 400);
|
||||
}
|
||||
|
||||
// Parse body after verification
|
||||
const body = JSON.parse(rawBody);
|
||||
const externalRetryCount = Number(request.headers.get('Upstash-Retried') ?? 0) || 0;
|
||||
const externalRetryCount = Number(c.req.header('upstash-retried') ?? 0) || 0;
|
||||
|
||||
try {
|
||||
const {
|
||||
operationId,
|
||||
@@ -37,7 +38,7 @@ export async function POST(request: NextRequest) {
|
||||
} = body;
|
||||
|
||||
if (!operationId) {
|
||||
return NextResponse.json({ error: 'operationId is required' }, { status: 400 });
|
||||
return c.json({ error: 'operationId is required' }, 400);
|
||||
}
|
||||
|
||||
log(`[${operationId}] Starting step ${stepIndex}`);
|
||||
@@ -48,14 +49,12 @@ export async function POST(request: NextRequest) {
|
||||
|
||||
if (!metadata?.userId) {
|
||||
log(`[${operationId}] Invalid operation or no userId found`);
|
||||
return NextResponse.json({ error: 'Invalid operation or unauthorized' }, { status: 401 });
|
||||
return c.json({ error: 'Invalid operation or unauthorized' }, 401);
|
||||
}
|
||||
|
||||
// Initialize service with userId from operation metadata
|
||||
const serverDB = await getServerDB();
|
||||
const agentRuntimeService = new AgentRuntimeService(serverDB, metadata.userId);
|
||||
|
||||
// Execute step using AgentRuntimeService
|
||||
const result = await agentRuntimeService.executeStep({
|
||||
approvedToolCall,
|
||||
context,
|
||||
@@ -71,14 +70,10 @@ export async function POST(request: NextRequest) {
|
||||
// Step is currently being executed by another instance — tell QStash to retry later
|
||||
if (result.locked) {
|
||||
log(`[${operationId}] Step ${stepIndex} locked by another instance, returning 429`);
|
||||
return NextResponse.json(
|
||||
return c.json(
|
||||
{ error: 'Step is currently being executed, retry later', operationId, stepIndex },
|
||||
{
|
||||
status: 429,
|
||||
headers: {
|
||||
'Retry-After': '37', // unit: seconds
|
||||
},
|
||||
},
|
||||
429,
|
||||
{ 'Retry-After': '37' },
|
||||
);
|
||||
}
|
||||
|
||||
@@ -106,41 +101,30 @@ export async function POST(request: NextRequest) {
|
||||
`[${operationId}] Step ${stepIndex} completed (${executionTime}ms, status: ${result.state.status})`,
|
||||
);
|
||||
|
||||
return NextResponse.json(responseData);
|
||||
return c.json(responseData);
|
||||
} catch (error: any) {
|
||||
const executionTime = Date.now() - startTime;
|
||||
console.error('Error in execution: %O', error);
|
||||
|
||||
return NextResponse.json(
|
||||
return c.json(
|
||||
{
|
||||
error: error.message,
|
||||
executionTime,
|
||||
operationId: body?.operationId,
|
||||
stepIndex: body?.stepIndex || 0,
|
||||
},
|
||||
{ status: 500 },
|
||||
500,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Health check endpoint
|
||||
* Health check for the agent execution path.
|
||||
*/
|
||||
export async function GET() {
|
||||
try {
|
||||
return NextResponse.json({
|
||||
healthy: true,
|
||||
message: 'Agent execution service is running',
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
} catch (error: any) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: error.message,
|
||||
healthy: false,
|
||||
timestamp: new Date().toISOString(),
|
||||
},
|
||||
{ status: 503 },
|
||||
);
|
||||
}
|
||||
export function runStepHealth(c: Context): Response {
|
||||
return c.json({
|
||||
healthy: true,
|
||||
message: 'Agent execution service is running',
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
@@ -1,8 +1,16 @@
|
||||
import { Hono } from 'hono';
|
||||
|
||||
import { botCallback } from './handlers/botCallback';
|
||||
import { execAgent } from './handlers/execAgent';
|
||||
import { finalizeAbandoned } from './handlers/finalizeAbandoned';
|
||||
import { gatewayCallback } from './handlers/gatewayCallback';
|
||||
import { gatewayCron } from './handlers/gatewayCron';
|
||||
import { gatewayStart } from './handlers/gatewayStart';
|
||||
import { platformWebhook } from './handlers/platformWebhook';
|
||||
import { runStep, runStepHealth } from './handlers/runStep';
|
||||
import { toolResult } from './handlers/toolResult';
|
||||
import { bearerSecretAuth } from './middlewares/bearerSecretAuth';
|
||||
import { qstashAuth } from './middlewares/qstashAuth';
|
||||
import { qstashOrApiKeyAuth } from './middlewares/qstashOrApiKeyAuth';
|
||||
import { serviceTokenAuth } from './middlewares/serviceTokenAuth';
|
||||
|
||||
@@ -10,9 +18,8 @@ import { serviceTokenAuth } from './middlewares/serviceTokenAuth';
|
||||
* Hono app for `/api/agent/*` endpoints. Mounted via the Next.js optional
|
||||
* catch-all at `src/app/(backend)/api/agent/[[...route]]/route.ts`.
|
||||
*
|
||||
* Routing precedence: existing static `route.ts` files (e.g. `run/route.ts`,
|
||||
* `stream/route.ts`, `gateway/*`, `webhooks/*`) win over the catch-all, so
|
||||
* individual paths can migrate one at a time — delete the static `route.ts`
|
||||
* Routing precedence: existing static `route.ts` files win over the catch-all,
|
||||
* so individual paths can migrate one at a time — delete the static `route.ts`
|
||||
* and add the corresponding handler here.
|
||||
*/
|
||||
const app = new Hono().basePath('/api/agent');
|
||||
@@ -20,6 +27,10 @@ const app = new Hono().basePath('/api/agent');
|
||||
// POST /api/agent — start a new agent operation (QStash sig OR API key)
|
||||
app.post('/', qstashOrApiKeyAuth(), execAgent);
|
||||
|
||||
// POST /api/agent/run — execute a single step (QStash signature)
|
||||
app.post('/run', qstashAuth(), runStep);
|
||||
app.get('/run', runStepHealth);
|
||||
|
||||
// POST /api/agent/tool-result — gateway-side tool result LPUSH'd to Redis
|
||||
app.post('/tool-result', serviceTokenAuth(), toolResult);
|
||||
|
||||
@@ -33,4 +44,28 @@ app.get('/finalize-abandoned', (c) =>
|
||||
}),
|
||||
);
|
||||
|
||||
// GET /api/agent/gateway — Vercel cron entry point (Bearer CRON_SECRET)
|
||||
app.get(
|
||||
'/gateway',
|
||||
bearerSecretAuth(() => process.env.CRON_SECRET),
|
||||
gatewayCron,
|
||||
);
|
||||
|
||||
// POST /api/agent/gateway/start — non-Vercel ensureRunning (Bearer KEY_VAULTS_SECRET)
|
||||
app.post(
|
||||
'/gateway/start',
|
||||
bearerSecretAuth(() => process.env.KEY_VAULTS_SECRET),
|
||||
gatewayStart,
|
||||
);
|
||||
|
||||
// POST /api/agent/gateway/callback — message gateway state-change callbacks
|
||||
// (auth is inline so the disabled-feature 204 short-circuits before auth)
|
||||
app.post('/gateway/callback', gatewayCallback);
|
||||
|
||||
// POST /api/agent/webhooks/bot-callback — agent step/completion webhooks (QStash)
|
||||
app.post('/webhooks/bot-callback', qstashAuth(), botCallback);
|
||||
|
||||
// POST /api/agent/webhooks/:platform[/:appId] — Chat SDK bot platform webhooks
|
||||
app.post('/webhooks/:platform/:appId?', platformWebhook);
|
||||
|
||||
export default app;
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
// @vitest-environment node
|
||||
import { afterEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { bearerSecretAuth } from '../bearerSecretAuth';
|
||||
|
||||
function buildContext(authHeader?: string) {
|
||||
let captured: { body: any; status: number } | undefined;
|
||||
const ctx = {
|
||||
json: (b: any, status = 200) => {
|
||||
captured = { body: b, status };
|
||||
return Response.json(b, { status });
|
||||
},
|
||||
req: {
|
||||
header: (name: string) => (name.toLowerCase() === 'authorization' ? authHeader : undefined),
|
||||
path: '/api/agent/gateway',
|
||||
},
|
||||
} as any;
|
||||
return { ctx, getCaptured: () => captured };
|
||||
}
|
||||
|
||||
describe('bearerSecretAuth middleware', () => {
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('returns 503 when the secret is unset', async () => {
|
||||
const next = vi.fn();
|
||||
const { ctx, getCaptured } = buildContext('Bearer anything');
|
||||
|
||||
const res = await bearerSecretAuth(() => undefined)(ctx, next);
|
||||
|
||||
expect(res?.status).toBe(503);
|
||||
expect(getCaptured()?.body).toEqual({ error: 'Service not configured' });
|
||||
expect(next).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns 401 when the authorization header is missing', async () => {
|
||||
const next = vi.fn();
|
||||
const { ctx, getCaptured } = buildContext(undefined);
|
||||
|
||||
const res = await bearerSecretAuth(() => 'shh')(ctx, next);
|
||||
|
||||
expect(res?.status).toBe(401);
|
||||
expect(getCaptured()?.body).toEqual({ error: 'Unauthorized' });
|
||||
expect(next).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns 401 when the authorization header does not match', async () => {
|
||||
const next = vi.fn();
|
||||
const { ctx } = buildContext('Bearer wrong');
|
||||
|
||||
const res = await bearerSecretAuth(() => 'shh')(ctx, next);
|
||||
|
||||
expect(res?.status).toBe(401);
|
||||
expect(next).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('calls next when the bearer token matches', async () => {
|
||||
const next = vi.fn().mockResolvedValue(undefined);
|
||||
const { ctx } = buildContext('Bearer shh');
|
||||
|
||||
await bearerSecretAuth(() => 'shh')(ctx, next);
|
||||
|
||||
expect(next).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('evaluates the secret getter on each request (no cache)', async () => {
|
||||
const getSecret = vi.fn();
|
||||
getSecret.mockReturnValueOnce(undefined).mockReturnValueOnce('s');
|
||||
|
||||
const mw = bearerSecretAuth(getSecret);
|
||||
|
||||
// first call: secret unset → 503
|
||||
const first = await mw(buildContext('Bearer s').ctx, vi.fn());
|
||||
expect(first?.status).toBe(503);
|
||||
|
||||
// second call: secret now set → 200/next
|
||||
const next = vi.fn().mockResolvedValue(undefined);
|
||||
await mw(buildContext('Bearer s').ctx, next);
|
||||
expect(next).toHaveBeenCalledTimes(1);
|
||||
expect(getSecret).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,64 @@
|
||||
// @vitest-environment node
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
// Imported after the mock so the middleware picks up the stubbed verifier.
|
||||
import { qstashAuth } from '../qstashAuth';
|
||||
|
||||
const mockVerify = vi.fn<(req: Request, body: string) => Promise<boolean>>();
|
||||
vi.mock('@/libs/qstash', () => ({
|
||||
verifyQStashSignature: (req: Request, body: string) => mockVerify(req, body),
|
||||
}));
|
||||
|
||||
function buildContext(rawBody: string) {
|
||||
const rawRequest = new Request('http://x/api/agent/run', {
|
||||
method: 'POST',
|
||||
body: rawBody,
|
||||
});
|
||||
let captured: { body: any; status: number } | undefined;
|
||||
const ctx = {
|
||||
json: (b: any, status = 200) => {
|
||||
captured = { body: b, status };
|
||||
return Response.json(b, { status });
|
||||
},
|
||||
req: {
|
||||
path: '/api/agent/run',
|
||||
raw: rawRequest,
|
||||
text: async () => rawBody,
|
||||
},
|
||||
} as any;
|
||||
return { ctx, getCaptured: () => captured };
|
||||
}
|
||||
|
||||
describe('qstashAuth middleware', () => {
|
||||
beforeEach(() => {
|
||||
mockVerify.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('passes the raw body and request to verifyQStashSignature', async () => {
|
||||
mockVerify.mockResolvedValue(true);
|
||||
const next = vi.fn().mockResolvedValue(undefined);
|
||||
const { ctx } = buildContext('{"hello":"world"}');
|
||||
|
||||
await qstashAuth()(ctx, next);
|
||||
|
||||
expect(mockVerify).toHaveBeenCalledTimes(1);
|
||||
expect(mockVerify).toHaveBeenCalledWith(ctx.req.raw, '{"hello":"world"}');
|
||||
expect(next).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('rejects with 401 when signature is invalid', async () => {
|
||||
mockVerify.mockResolvedValue(false);
|
||||
const next = vi.fn();
|
||||
const { ctx, getCaptured } = buildContext('{"hello":"world"}');
|
||||
|
||||
const res = await qstashAuth()(ctx, next);
|
||||
|
||||
expect(res?.status).toBe(401);
|
||||
expect(getCaptured()?.body).toEqual({ error: 'Invalid signature' });
|
||||
expect(next).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,31 @@
|
||||
import debug from 'debug';
|
||||
import type { MiddlewareHandler } from 'hono';
|
||||
|
||||
const log = debug('lobe-server:agent:bearer-secret-auth');
|
||||
|
||||
/**
|
||||
* Hono middleware factory that requires `Authorization: Bearer <secret>`
|
||||
* matching a runtime-evaluated secret.
|
||||
*
|
||||
* - `getSecret` is invoked per request (do not capture the value at module
|
||||
* load time — env vars may not be populated yet).
|
||||
* - Returns `503` when the secret is unset (matches the existing
|
||||
* `serviceTokenAuth` behavior).
|
||||
* - Returns `401` on header mismatch.
|
||||
*/
|
||||
export const bearerSecretAuth =
|
||||
(getSecret: () => string | undefined): MiddlewareHandler =>
|
||||
async (c, next) => {
|
||||
const secret = getSecret();
|
||||
if (!secret) {
|
||||
log('Secret is not configured for %s', c.req.path);
|
||||
return c.json({ error: 'Service not configured' }, 503);
|
||||
}
|
||||
|
||||
const authHeader = c.req.header('authorization');
|
||||
if (authHeader !== `Bearer ${secret}`) {
|
||||
return c.json({ error: 'Unauthorized' }, 401);
|
||||
}
|
||||
|
||||
await next();
|
||||
};
|
||||
@@ -0,0 +1,25 @@
|
||||
import debug from 'debug';
|
||||
import type { MiddlewareHandler } from 'hono';
|
||||
|
||||
import { verifyQStashSignature } from '@/libs/qstash';
|
||||
|
||||
const log = debug('lobe-server:agent:qstash-auth');
|
||||
|
||||
/**
|
||||
* Hono middleware that requires a valid QStash signature on the request.
|
||||
*
|
||||
* The body is consumed via `c.req.text()` to compute the QStash HMAC;
|
||||
* downstream handlers can still call `c.req.json()` thanks to Hono's
|
||||
* bodyCache cross-conversion.
|
||||
*/
|
||||
export const qstashAuth = (): MiddlewareHandler => async (c, next) => {
|
||||
const rawBody = await c.req.text();
|
||||
const isValid = await verifyQStashSignature(c.req.raw, rawBody);
|
||||
|
||||
if (!isValid) {
|
||||
log('Rejected: invalid QStash signature on %s', c.req.path);
|
||||
return c.json({ error: 'Invalid signature' }, 401);
|
||||
}
|
||||
|
||||
await next();
|
||||
};
|
||||
Reference in New Issue
Block a user