🐛 fix: message gateway queue error (#13816)

* fix: gateway sync

* fix: skip  error connection

* feat: add disconnect all &  MESSAGE_GATEWAY_ENABLED env vairable

* chore: add gateway test case

* chore: clean lobehub connnections when switch to message gateway

* chore: optimize disconnect all

* chore: disconnect gateway connnections when using lobehub gateway

* chore: clean up exsiting gateway connections after reconnect and avoid gateway callback when not enabled
This commit is contained in:
Rdmclin2
2026-04-14 22:10:17 +08:00
committed by GitHub
parent c70ac84da7
commit 636a3b77c3
7 changed files with 480 additions and 28 deletions
+4 -2
View File
@@ -413,7 +413,9 @@ OPENAI_API_KEY=sk-xxxxxxxxx
# #### Message Gateway (IM Integration) ##
# #######################################
# URL of the message-gateway Cloudflare Worker for unified IM platform connection management
# When set, LobeHub delegates all platform connections to the external gateway
# External message-gateway for unified IM platform connection management.
# Set ENABLED=1 to activate. To migrate away, remove ENABLED first (keep URL/TOKEN)
# so LobeHub can automatically disconnect leftover gateway connections.
# MESSAGE_GATEWAY_ENABLED=1
# MESSAGE_GATEWAY_URL=https://message-gateway.lobehub.com
# MESSAGE_GATEWAY_SERVICE_TOKEN=your_service_token_here
@@ -0,0 +1,93 @@
import debug from 'debug';
import { type NextRequest, NextResponse } from 'next/server';
import { z } from 'zod';
import { gatewayEnv } from '@/envs/gateway';
import {
BOT_RUNTIME_STATUSES,
type BotRuntimeStatus,
updateBotRuntimeStatus,
} from '@/server/services/gateway/runtimeStatus';
const log = debug('api-route:agent:gateway:callback');
const StateChangeSchema = z.object({
applicationId: z.string().optional(),
connectionId: z.string(),
platform: z.string(),
state: z.object({
error: z.string().optional(),
status: z.enum(['connected', 'connecting', 'disconnected', 'error']),
}),
});
/**
* Receive connection state-change callbacks from the external message gateway.
* When a persistent connection (e.g. Discord WebSocket) transitions to
* "connected" or "error" asynchronously, the gateway POSTs here so LobeHub
* can update the bot runtime status visible to users.
*
* Authenticated with MESSAGE_GATEWAY_SERVICE_TOKEN.
*/
export async function POST(request: NextRequest) {
// Ignore callbacks when gateway is disabled — connections are managed locally,
// and stale gateway callbacks (e.g. from disconnectAll during migration) could
// overwrite locally-managed status.
if (gatewayEnv.MESSAGE_GATEWAY_ENABLED !== '1') {
return new NextResponse(null, { status: 204 });
}
const serviceToken = gatewayEnv.MESSAGE_GATEWAY_SERVICE_TOKEN;
if (!serviceToken) {
return NextResponse.json({ error: 'Service not configured' }, { status: 503 });
}
const authHeader = request.headers.get('authorization');
if (authHeader !== `Bearer ${serviceToken}`) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
}
let parsed;
try {
const body = await request.json();
parsed = StateChangeSchema.safeParse(body);
} catch {
return NextResponse.json({ error: 'Invalid JSON' }, { status: 400 });
}
if (!parsed.success) {
return NextResponse.json(
{ error: 'Invalid body', issues: parsed.error.issues },
{ status: 400 },
);
}
const { applicationId, platform, state } = parsed.data;
if (!applicationId) {
return new NextResponse(null, { status: 204 });
}
const statusMap: Partial<Record<string, BotRuntimeStatus>> = {
connected: BOT_RUNTIME_STATUSES.connected,
disconnected: BOT_RUNTIME_STATUSES.disconnected,
error: BOT_RUNTIME_STATUSES.failed,
};
const runtimeStatus = statusMap[state.status];
if (!runtimeStatus) {
// "connecting" — no status update needed
return new NextResponse(null, { status: 204 });
}
await updateBotRuntimeStatus({
applicationId,
errorMessage: state.error,
platform,
status: runtimeStatus,
});
log('Updated %s:%s → %s', platform, applicationId, runtimeStatus);
return new NextResponse(null, { status: 204 });
}
+6 -4
View File
@@ -123,13 +123,15 @@ export async function GET(request: NextRequest) {
return new Response('Unauthorized', { status: 401 });
}
// When an external message gateway is configured, use ensureRunning to sync
// connections via the gateway instead of starting bots in-process.
// When the external message gateway is enabled, sync connections via gateway.
if (process.env.MESSAGE_GATEWAY_URL && process.env.MESSAGE_GATEWAY_SERVICE_TOKEN) {
const { GatewayService } = await import('@/server/services/gateway');
const service = new GatewayService();
await service.ensureRunning();
return Response.json({ ensureRunning: true, reason: 'synced via external message gateway' });
if (service.useMessageGateway) {
await service.ensureRunning();
return Response.json({ ensureRunning: true });
}
}
const platforms = platformRegistry.listPlatforms();
+2
View File
@@ -6,6 +6,7 @@ export const getGatewayConfig = () => {
runtimeEnv: {
DEVICE_GATEWAY_SERVICE_TOKEN: process.env.DEVICE_GATEWAY_SERVICE_TOKEN,
DEVICE_GATEWAY_URL: process.env.DEVICE_GATEWAY_URL,
MESSAGE_GATEWAY_ENABLED: process.env.MESSAGE_GATEWAY_ENABLED,
MESSAGE_GATEWAY_SERVICE_TOKEN: process.env.MESSAGE_GATEWAY_SERVICE_TOKEN,
MESSAGE_GATEWAY_URL: process.env.MESSAGE_GATEWAY_URL,
},
@@ -13,6 +14,7 @@ export const getGatewayConfig = () => {
server: {
DEVICE_GATEWAY_SERVICE_TOKEN: z.string().optional(),
DEVICE_GATEWAY_URL: z.string().url().optional(),
MESSAGE_GATEWAY_ENABLED: z.string().optional(),
MESSAGE_GATEWAY_SERVICE_TOKEN: z.string().optional(),
MESSAGE_GATEWAY_URL: z.string().url().optional(),
},
@@ -83,6 +83,19 @@ export class MessageGatewayClient {
return res.json();
}
async disconnectAll(): Promise<{ total: number }> {
log('Disconnecting all connections');
const res = await this.fetch('/api/connections', { method: 'DELETE' });
if (!res.ok) {
const error = await res.text();
throw new Error(`message-gateway disconnect-all failed (${res.status}): ${error}`);
}
return res.json();
}
async disconnect(connectionId: string): Promise<{ status: string }> {
log('Disconnecting %s', connectionId);
@@ -0,0 +1,287 @@
// @vitest-environment node
import { beforeEach, describe, expect, it, vi } from 'vitest';
// ─── Import after mocks ───
import { GatewayService } from '../index';
// ─── Hoisted mocks ───
const mockGatewayClient = vi.hoisted(() => ({
connect: vi.fn(),
disconnect: vi.fn(),
disconnectAll: vi.fn(),
getStats: vi.fn(),
getStatus: vi.fn(),
isConfigured: false,
}));
const mockGatewayEnv = vi.hoisted(() => ({
MESSAGE_GATEWAY_ENABLED: undefined as string | undefined,
}));
const mockGatewayManager = vi.hoisted(() => ({
isRunning: false,
start: vi.fn(),
startClient: vi.fn(),
stop: vi.fn(),
stopClient: vi.fn(),
}));
const mockFindEnabledByPlatform = vi.hoisted(() => vi.fn());
const mockGetServerDB = vi.hoisted(() => vi.fn());
const mockInitWithEnvKey = vi.hoisted(() => vi.fn());
const mockUpdateBotRuntimeStatus = vi.hoisted(() => vi.fn());
const mockGetEffectiveConnectionMode = vi.hoisted(() => vi.fn());
// ─── Module mocks ───
vi.mock('@/envs/gateway', () => ({
gatewayEnv: mockGatewayEnv,
}));
vi.mock('../MessageGatewayClient', () => ({
getMessageGatewayClient: () => mockGatewayClient,
}));
vi.mock('../GatewayManager', () => ({
createGatewayManager: () => mockGatewayManager,
getGatewayManager: () => (mockGatewayManager.isRunning ? mockGatewayManager : null),
}));
vi.mock('@/database/core/db-adaptor', () => ({
getServerDB: mockGetServerDB,
}));
vi.mock('@/database/models/agentBotProvider', () => ({
AgentBotProviderModel: {
findEnabledByPlatform: mockFindEnabledByPlatform,
},
}));
vi.mock('@/server/modules/KeyVaultsEncrypt', () => ({
KeyVaultsGateKeeper: { initWithEnvKey: mockInitWithEnvKey },
}));
vi.mock('../runtimeStatus', () => ({
BOT_RUNTIME_STATUSES: {
connected: 'connected',
disconnected: 'disconnected',
failed: 'failed',
queued: 'queued',
starting: 'starting',
},
updateBotRuntimeStatus: mockUpdateBotRuntimeStatus,
}));
vi.mock('../../bot/platforms', () => ({
getEffectiveConnectionMode: mockGetEffectiveConnectionMode,
platformRegistry: {
getPlatform: () => ({ id: 'discord' }),
listPlatforms: () => [{ id: 'discord' }, { id: 'telegram' }],
},
}));
describe('GatewayService', () => {
let service: GatewayService;
beforeEach(() => {
vi.clearAllMocks();
mockGatewayClient.isConfigured = false;
mockGatewayEnv.MESSAGE_GATEWAY_ENABLED = undefined;
mockGatewayManager.isRunning = false;
mockGetServerDB.mockResolvedValue({});
mockInitWithEnvKey.mockResolvedValue({});
mockFindEnabledByPlatform.mockResolvedValue([]);
mockUpdateBotRuntimeStatus.mockResolvedValue({});
service = new GatewayService();
});
// ─── useMessageGateway ───
describe('useMessageGateway', () => {
it('returns false when ENABLED is not set', () => {
mockGatewayClient.isConfigured = true;
expect(service.useMessageGateway).toBe(false);
});
it('returns false when client is not configured', () => {
mockGatewayEnv.MESSAGE_GATEWAY_ENABLED = '1';
mockGatewayClient.isConfigured = false;
expect(service.useMessageGateway).toBe(false);
});
it('returns true when ENABLED=1 and client is configured', () => {
mockGatewayEnv.MESSAGE_GATEWAY_ENABLED = '1';
mockGatewayClient.isConfigured = true;
expect(service.useMessageGateway).toBe(true);
});
});
// ─── ensureRunning ───
describe('ensureRunning', () => {
describe('in-process mode (gateway disabled)', () => {
it('starts local GatewayManager', async () => {
await service.ensureRunning();
expect(mockGatewayManager.start).toHaveBeenCalled();
});
it('skips start if GatewayManager already running', async () => {
mockGatewayManager.isRunning = true;
await service.ensureRunning();
expect(mockGatewayManager.start).not.toHaveBeenCalled();
});
});
describe('gateway mode (ENABLED=1)', () => {
beforeEach(() => {
mockGatewayEnv.MESSAGE_GATEWAY_ENABLED = '1';
mockGatewayClient.isConfigured = true;
});
it('calls syncGatewayConnections instead of starting local manager', async () => {
await service.ensureRunning();
expect(mockGatewayManager.start).not.toHaveBeenCalled();
});
});
});
// ─── syncGatewayConnections ───
describe('syncGatewayConnections (via ensureRunning)', () => {
beforeEach(() => {
mockGatewayEnv.MESSAGE_GATEWAY_ENABLED = '1';
mockGatewayClient.isConfigured = true;
});
it('skips webhook-mode providers', async () => {
mockFindEnabledByPlatform.mockResolvedValue([
{ applicationId: 'app-1', credentials: {}, id: 'prov-1', settings: {}, userId: 'u1' },
]);
mockGetEffectiveConnectionMode.mockReturnValue('webhook');
await service.ensureRunning();
expect(mockGatewayClient.connect).not.toHaveBeenCalled();
});
it('skips already connected providers', async () => {
mockFindEnabledByPlatform.mockResolvedValue([
{ applicationId: 'app-1', credentials: {}, id: 'prov-1', settings: {}, userId: 'u1' },
]);
mockGetEffectiveConnectionMode.mockReturnValue('websocket');
mockGatewayClient.getStatus.mockResolvedValue({
state: { status: 'connected' },
});
await service.ensureRunning();
expect(mockGatewayClient.connect).not.toHaveBeenCalled();
});
it('skips connecting providers', async () => {
mockFindEnabledByPlatform.mockResolvedValue([
{ applicationId: 'app-1', credentials: {}, id: 'prov-1', settings: {}, userId: 'u1' },
]);
mockGetEffectiveConnectionMode.mockReturnValue('websocket');
mockGatewayClient.getStatus.mockResolvedValue({
state: { status: 'connecting' },
});
await service.ensureRunning();
expect(mockGatewayClient.connect).not.toHaveBeenCalled();
});
it('skips providers in error state', async () => {
mockFindEnabledByPlatform.mockResolvedValue([
{ applicationId: 'app-1', credentials: {}, id: 'prov-1', settings: {}, userId: 'u1' },
]);
mockGetEffectiveConnectionMode.mockReturnValue('websocket');
mockGatewayClient.getStatus.mockResolvedValue({
state: { error: 'Session expired (errcode -14)', status: 'error' },
});
await service.ensureRunning();
expect(mockGatewayClient.connect).not.toHaveBeenCalled();
});
it('connects disconnected providers', async () => {
mockFindEnabledByPlatform.mockResolvedValue([
{
applicationId: 'app-1',
credentials: { token: 'x' },
id: 'prov-1',
settings: {},
userId: 'u1',
},
]);
mockGetEffectiveConnectionMode.mockReturnValue('websocket');
mockGatewayClient.getStatus.mockResolvedValue({
state: { status: 'disconnected' },
});
mockGatewayClient.connect.mockResolvedValue({ status: 'connecting' });
await service.ensureRunning();
expect(mockGatewayClient.connect).toHaveBeenCalledWith(
expect.objectContaining({
applicationId: 'app-1',
connectionId: 'prov-1',
platform: 'discord',
}),
);
expect(mockUpdateBotRuntimeStatus).toHaveBeenCalledWith(
expect.objectContaining({ status: 'starting' }),
);
});
it('sets connected status for sync connect result', async () => {
mockFindEnabledByPlatform.mockResolvedValue([
{ applicationId: 'app-1', credentials: {}, id: 'prov-1', settings: {}, userId: 'u1' },
]);
mockGetEffectiveConnectionMode.mockReturnValue('websocket');
mockGatewayClient.getStatus.mockRejectedValue(new Error('not found'));
mockGatewayClient.connect.mockResolvedValue({ status: 'connected' });
await service.ensureRunning();
expect(mockUpdateBotRuntimeStatus).toHaveBeenCalledWith(
expect.objectContaining({ status: 'connected' }),
);
});
it('tries to connect when status check fails', async () => {
mockFindEnabledByPlatform.mockResolvedValue([
{ applicationId: 'app-1', credentials: {}, id: 'prov-1', settings: {}, userId: 'u1' },
]);
mockGetEffectiveConnectionMode.mockReturnValue('websocket');
mockGatewayClient.getStatus.mockRejectedValue(new Error('DO not found'));
mockGatewayClient.connect.mockResolvedValue({ status: 'connecting' });
await service.ensureRunning();
expect(mockGatewayClient.connect).toHaveBeenCalled();
});
it('handles connect failure gracefully', async () => {
mockFindEnabledByPlatform.mockResolvedValue([
{ applicationId: 'app-1', credentials: {}, id: 'prov-1', settings: {}, userId: 'u1' },
]);
mockGetEffectiveConnectionMode.mockReturnValue('websocket');
mockGatewayClient.getStatus.mockResolvedValue({
state: { status: 'disconnected' },
});
mockGatewayClient.connect.mockRejectedValue(new Error('timeout'));
// Should not throw
await expect(service.ensureRunning()).resolves.toBeUndefined();
expect(mockUpdateBotRuntimeStatus).not.toHaveBeenCalled();
});
});
});
+75 -22
View File
@@ -2,6 +2,7 @@ import debug from 'debug';
import { getServerDB } from '@/database/core/db-adaptor';
import { AgentBotProviderModel } from '@/database/models/agentBotProvider';
import { gatewayEnv } from '@/envs/gateway';
import { KeyVaultsGateKeeper } from '@/server/modules/KeyVaultsEncrypt';
import type { ConnectionMode } from '../bot/platforms';
@@ -17,12 +18,13 @@ const isVercel = !!process.env.VERCEL_ENV;
export class GatewayService {
/**
* Check if the external message-gateway is configured.
* When enabled, all platforms are registered on the gateway for
* connection management and typing persistence.
* Whether to use the external message-gateway for connection management.
* Requires MESSAGE_GATEWAY_ENABLED=1 plus URL/TOKEN to be configured.
* This allows disabling the gateway (for migration) while keeping
* the client reachable for cleanup.
*/
get useMessageGateway(): boolean {
return getMessageGatewayClient().isConfigured;
return gatewayEnv.MESSAGE_GATEWAY_ENABLED === '1' && getMessageGatewayClient().isConfigured;
}
async ensureRunning(): Promise<void> {
@@ -37,10 +39,24 @@ export class GatewayService {
return;
}
// Start local connections first, then clean up gateway —
// brief overlap is better than a gap where messages are lost.
const manager = createGatewayManager({ definitions: platformRegistry.listPlatforms() });
await manager.start();
log('GatewayManager started');
// Clean up leftover gateway connections to prevent duplicates.
const client = getMessageGatewayClient();
if (client.isConfigured) {
try {
const result = await client.disconnectAll();
if (result.total > 0) {
log('Cleaned up %d gateway connections', result.total);
}
} catch (err) {
log('Gateway cleanup skipped (non-critical): %O', err);
}
}
}
/**
@@ -56,6 +72,10 @@ export class GatewayService {
const serverDB = await getServerDB();
const gateKeeper = await KeyVaultsGateKeeper.initWithEnvKey();
let totalSynced = 0;
let totalSkipped = 0;
let totalFailed = 0;
// Sync all registered platforms
for (const definition of platformRegistry.listPlatforms()) {
const platform = definition.id;
@@ -66,7 +86,10 @@ export class GatewayService {
gateKeeper,
);
log('Gateway sync: found %d enabled providers for %s', providers.length, platform);
let synced = 0;
let skippedWebhook = 0;
let skippedConnected = 0;
let failed = 0;
for (const provider of providers) {
try {
@@ -74,26 +97,26 @@ export class GatewayService {
const connectionMode = getEffectiveConnectionMode(definition, provider.settings);
// Webhook-mode platforms don't need persistent gateway connections.
// Run the platform client locally via GatewayManager instead
// (e.g. Telegram setWebhook, QQ credential verification).
// The webhook URL is set once when the user saves the bot config
// (via startClientViaGateway). No action needed during periodic sync.
if (connectionMode === 'webhook') {
const manager = createGatewayManager({
definitions: platformRegistry.listPlatforms(),
});
await manager.startClient(platform, provider.applicationId, provider.userId);
log(
'Gateway sync: started webhook-mode %s:%s locally',
platform,
provider.applicationId,
);
skippedWebhook++;
continue;
}
// For persistent connections, check gateway status before reconnecting
try {
const status = await client.getStatus(provider.id);
if (status.state.status === 'connected') {
log('Gateway sync: %s already connected, skipping', provider.id);
if (status.state.status === 'connected' || status.state.status === 'connecting') {
skippedConnected++;
log('Gateway sync: %s already %s, skipping', provider.id, status.state.status);
continue;
}
// "error" means credential/config issue (e.g. session expired, unauthorized).
// Auto-retry is pointless — only user action (saving new credentials) can fix it.
if (status.state.status === 'error') {
skippedConnected++;
log('Gateway sync: %s in error (%s), skipping', provider.id, status.state.error);
continue;
}
} catch {
@@ -101,7 +124,7 @@ export class GatewayService {
}
const webhookPath = `/api/agent/webhooks/${platform}/${provider.applicationId}`;
await client.connect({
const result = await client.connect({
applicationId: provider.applicationId,
connectionId: provider.id,
connectionMode,
@@ -111,21 +134,51 @@ export class GatewayService {
webhookPath,
});
// Gateway returns "connecting" for async persistent connections
// (e.g. Discord WebSocket), "connected" for sync webhook-mode.
const runtimeStatus =
result.status === 'connected'
? BOT_RUNTIME_STATUSES.connected
: BOT_RUNTIME_STATUSES.starting;
await updateBotRuntimeStatus({
applicationId: provider.applicationId,
platform,
status: BOT_RUNTIME_STATUSES.connected,
status: runtimeStatus,
});
log('Gateway sync: connected %s:%s', platform, provider.applicationId);
synced++;
log('Gateway sync: %s %s:%s', result.status, platform, provider.applicationId);
} catch (err) {
failed++;
log('Gateway sync: failed to connect %s:%s: %O', platform, provider.applicationId, err);
}
}
log(
'Gateway sync: %s — total=%d synced=%d skippedWebhook=%d skippedConnected=%d failed=%d',
platform,
providers.length,
synced,
skippedWebhook,
skippedConnected,
failed,
);
totalSynced += synced;
totalSkipped += skippedWebhook + skippedConnected;
totalFailed += failed;
} catch (err) {
log('Gateway sync: error syncing platform %s: %O', platform, err);
}
}
log(
'Gateway sync complete: synced=%d skipped=%d failed=%d',
totalSynced,
totalSkipped,
totalFailed,
);
}
async stop(): Promise<void> {