feat: support message gateway (#13761)

* feat: support message gateway

* feat: sync message gateway connections

* chore: add cloudflare http v2

* fix: typing interval

* feat: add connnectionMode to gateway

* chore: add applicationId when connect

* fix: judge typing supoort by  triggerTyping implementation

* fix: skip message gateway & start connnections

* fix: qq platform hint

* chore: skip webhook mode in gateway connection

* fix: test case

* fix:  message gateway check

* chore: add failaure case post

* fix: test case

* feat: add GatewayManager for webhook-mode platforms
This commit is contained in:
Rdmclin2
2026-04-13 01:27:54 +08:00
committed by GitHub
parent 3ad124ac4f
commit 73be58ba12
19 changed files with 659 additions and 27 deletions
+9
View File
@@ -408,3 +408,12 @@ OPENAI_API_KEY=sk-xxxxxxxxx
# IMPORTANT: This key is stored server-side only and NEVER exposed to the client
# When this key is set, Klavis integration will be automatically enabled
# KLAVIS_API_KEY=your_klavis_api_key_here
# #######################################
# #### Message Gateway (IM Integration) ##
# #######################################
# URL of the message-gateway Cloudflare Worker for unified IM platform connection management
# When set, LobeHub delegates all platform connections to the external gateway
# MESSAGE_GATEWAY_URL=https://message-gateway.lobehub.com
# MESSAGE_GATEWAY_SERVICE_TOKEN=your_service_token_here
+1 -1
View File
@@ -23,7 +23,7 @@
"channel.connecting": "Connecting...",
"channel.connectionConfig": "Connection Configuration",
"channel.connectionMode": "Connection Mode",
"channel.connectionModeHint": "WebSocket is recommended for new bots. Use Webhook if your bot already has a callback URL configured on QQ Open Platform.",
"channel.connectionModeHint": "WebSocket is recommended for new bots. Use Webhook if your bot already has a callback URL configured.",
"channel.connectionModeWebSocket": "WebSocket",
"channel.connectionModeWebhook": "Webhook",
"channel.copied": "Copied to clipboard",
+1 -1
View File
@@ -23,7 +23,7 @@
"channel.connecting": "连接中...",
"channel.connectionConfig": "连接配置",
"channel.connectionMode": "连接模式",
"channel.connectionModeHint": "新机器人推荐使用 WebSocket。如果你的机器人已在 QQ 开放平台配置了回调地址,请选择 Webhook。",
"channel.connectionModeHint": "新机器人推荐使用 WebSocket。如果你的机器人已配置了回调地址,请选择 Webhook。",
"channel.connectionModeWebSocket": "WebSocket",
"channel.connectionModeWebhook": "Webhook",
"channel.copied": "已复制到剪贴板",
+1 -1
View File
@@ -107,7 +107,7 @@
"test:update": "vitest -u",
"test-app": "vitest run",
"test-app:coverage": "vitest --coverage --silent='passed-only'",
"tunnel:cloudflare": "cloudflared tunnel --url http://localhost:3010",
"tunnel:cloudflare": "cloudflared tunnel --url http://localhost:3010 --protocol http2",
"tunnel:ngrok": "ngrok http http://localhost:3010",
"type-check": "tsgo --noEmit",
"type-check:tsc": "tsc --noEmit",
@@ -123,6 +123,13 @@ export async function GET(request: NextRequest) {
return new Response('Unauthorized', { status: 401 });
}
// When an external message gateway is fully configured (both URL and service
// token), it manages all persistent connections. Skip in-process bot startup
// to avoid duplicate connections.
if (process.env.MESSAGE_GATEWAY_URL && process.env.MESSAGE_GATEWAY_SERVICE_TOKEN) {
return Response.json({ skipped: true, reason: 'using external message gateway' });
}
const platforms = platformRegistry.listPlatforms();
const serverDB = await getServerDB();
+4
View File
@@ -6,11 +6,15 @@ export const getGatewayConfig = () => {
runtimeEnv: {
DEVICE_GATEWAY_SERVICE_TOKEN: process.env.DEVICE_GATEWAY_SERVICE_TOKEN,
DEVICE_GATEWAY_URL: process.env.DEVICE_GATEWAY_URL,
MESSAGE_GATEWAY_SERVICE_TOKEN: process.env.MESSAGE_GATEWAY_SERVICE_TOKEN,
MESSAGE_GATEWAY_URL: process.env.MESSAGE_GATEWAY_URL,
},
server: {
DEVICE_GATEWAY_SERVICE_TOKEN: z.string().optional(),
DEVICE_GATEWAY_URL: z.string().url().optional(),
MESSAGE_GATEWAY_SERVICE_TOKEN: z.string().optional(),
MESSAGE_GATEWAY_URL: z.string().url().optional(),
},
});
};
+5 -4
View File
@@ -4,12 +4,13 @@ export async function register() {
await import('./libs/debug-file-logger');
}
// Auto-start GatewayManager for non-Vercel environments so that
// persistent bots (e.g. Discord gateway, WeChat long-polling) reconnect after server restart.
// Auto-start GatewayManager / sync message-gateway connections on server start.
// - Non-Vercel (Docker, local): always run — persistent bots need reconnection after restart.
// - Vercel: only run when an external message gateway is configured, to sync connection state.
if (
process.env.NEXT_RUNTIME === 'nodejs' &&
!process.env.VERCEL_ENV &&
process.env.DATABASE_URL
process.env.DATABASE_URL &&
(!process.env.VERCEL_ENV || process.env.MESSAGE_GATEWAY_URL)
) {
const { GatewayService } = await import('./server/services/gateway');
const service = new GatewayService();
+1 -1
View File
@@ -112,7 +112,7 @@ export default {
'The App Secret of your bot application. It will be encrypted and stored securely.',
'channel.connectionMode': 'Connection Mode',
'channel.connectionModeHint':
'WebSocket is recommended for new bots. Use Webhook if your bot already has a callback URL configured on QQ Open Platform.',
'WebSocket is recommended for new bots. Use Webhook if your bot already has a callback URL configured.',
'channel.connectionModeWebSocket': 'WebSocket',
'channel.connectionModeWebhook': 'Webhook',
'channel.charLimit': 'Character Limit',
+53 -8
View File
@@ -4,11 +4,13 @@ import type { Message, SentMessage, Thread } from 'chat';
import { emoji } from 'chat';
import debug from 'debug';
import { AgentBotProviderModel } from '@/database/models/agentBotProvider';
import { TopicModel } from '@/database/models/topic';
import { UserModel } from '@/database/models/user';
import type { LobeChatDatabase } from '@/database/type';
import { createAbortError, isAbortError } from '@/server/services/agentRuntime/abort';
import { AiAgentService } from '@/server/services/aiAgent';
import { getMessageGatewayClient } from '@/server/services/gateway/MessageGatewayClient';
import { isQueueAgentRuntimeEnabled } from '@/server/services/queue/impls';
import { SystemAgentService } from '@/server/services/systemAgent';
@@ -232,14 +234,22 @@ export class AgentBridgeService {
AgentBridgeService.clearActiveThread(thread.id);
const errorContent = stopped ? renderStopped(errorMessage) : renderError(errorMessage);
if (progressMessage) {
try {
await progressMessage.edit(
stopped ? renderStopped(errorMessage) : renderError(errorMessage),
);
await progressMessage.edit(errorContent);
} catch (editError) {
log('finishStartupFailure: failed to edit progress message: %O', editError);
}
} else {
// No placeholder message (e.g. gateway typing mode) — post a new message
// so the user still sees the error instead of a silently frozen typing indicator.
try {
await thread.post(errorContent);
} catch (postError) {
log('finishStartupFailure: failed to post error message: %O', postError);
}
}
await this.removeReceivedReaction(thread, userMessage);
@@ -524,13 +534,48 @@ export class AgentBridgeService {
const aiAgentService = new AiAgentService(this.db, this.userId);
const timezone = await this.loadTimezone();
await safeSideEffect(() => thread.startTyping(), 'startTyping (executeWithWebhooks)');
// When the message-gateway is configured AND the platform supports typing
// indicators, skip the ack/progress message and rely on the gateway's
// alarm-based typing indicator throughout AI generation.
// Posting an ack message cancels platform-level typing (e.g. Discord), and the
// gateway typing makes ack redundant as user feedback.
// For platforms without typing support (no triggerTyping on messenger), the
// gateway typing is invisible, so we still send an ack message as user feedback.
const gwClient = getMessageGatewayClient();
const platformSupportsTyping =
client && botContext?.platformThreadId
? !!client.getMessenger(botContext.platformThreadId).triggerTyping
: true;
const useGatewayTyping = gwClient.isConfigured && platformSupportsTyping;
let progressMessage: SentMessage | undefined;
try {
progressMessage = await thread.post(renderStart(userMessage.text, { timezone }));
} catch (error) {
log('executeWithCallback: failed to post initial placeholder message: %O', error);
if (useGatewayTyping) {
log('executeWithWebhooks: using gateway typing, skipping ack message');
// Platform typing (best-effort, must not block AI generation)
await safeSideEffect(() => thread.startTyping(), 'startTyping (executeWithWebhooks)');
// Start gateway typing immediately so the alarm keeps it alive through
// the entire AI generation (platform typing expires after ~10s).
if (botContext?.platformThreadId && botContext?.applicationId) {
const platform = botContext.platformThreadId.split(':')[0];
AgentBotProviderModel.findByPlatformAndAppId(this.db, platform, botContext.applicationId)
.then((row) => {
if (row?.id) {
return gwClient.startTyping(row.id, botContext.platformThreadId!);
}
})
.catch((err) => {
log('executeWithWebhooks: gateway startTyping failed: %O', err);
});
}
} else {
await safeSideEffect(() => thread.startTyping(), 'startTyping (executeWithWebhooks)');
try {
progressMessage = await thread.post(renderStart(userMessage.text, { timezone }));
} catch (error) {
log('executeWithWebhooks: failed to post initial placeholder message: %O', error);
}
}
const { files, warnings: fileWarnings } = await this.resolveFiles(userMessage, client);
+36 -3
View File
@@ -5,6 +5,7 @@ import { TopicModel } from '@/database/models/topic';
import { type LobeChatDatabase } from '@/database/type';
import { getAgentRuntimeRedisClient } from '@/server/modules/AgentRuntime/redis';
import { KeyVaultsGateKeeper } from '@/server/modules/KeyVaultsEncrypt';
import { getMessageGatewayClient } from '@/server/services/gateway/MessageGatewayClient';
import { SystemAgentService } from '@/server/services/systemAgent';
import { AgentBridgeService } from './AgentBridgeService';
@@ -76,7 +77,7 @@ export class BotCallbackService {
const { type, applicationId, platformThreadId, progressMessageId } = body;
const platform = platformThreadId.split(':')[0];
const { client, messenger, charLimit, settings } = await this.createMessenger(
const { client, connectionId, messenger, charLimit, settings } = await this.createMessenger(
platform,
applicationId,
platformThreadId,
@@ -89,7 +90,16 @@ export class BotCallbackService {
if (canEdit && progressMessageId && settings.displayToolCalls !== false) {
await this.handleStep(body, messenger, progressMessageId, client);
}
// Only renew typing when more steps are expected. The final step
// (shouldContinue=false) may arrive after the completion callback
// via async delivery (QStash), which would restart typing after stop.
if (body.shouldContinue) {
this.renewGatewayTyping(connectionId, platformThreadId);
}
} else if (type === 'completion') {
// Stop typing on the gateway
this.stopGatewayTyping(connectionId, platformThreadId);
await this.handleCompletion(
body,
messenger,
@@ -113,6 +123,7 @@ export class BotCallbackService {
platformThreadId: string,
): Promise<{
charLimit?: number;
connectionId: string;
client: PlatformClient;
messenger: PlatformMessenger;
settings: Record<string, unknown>;
@@ -156,7 +167,7 @@ export class BotCallbackService {
});
const messenger = client.getMessenger(platformThreadId);
return { charLimit, client, messenger, settings };
return { charLimit, connectionId: row.id, messenger, client, settings };
}
private async handleStep(
@@ -201,7 +212,7 @@ export class BotCallbackService {
try {
await messenger.editMessage(progressMessageId, progressText);
if (!isLlmFinalResponse) {
await messenger.triggerTyping();
await messenger.triggerTyping?.();
}
} catch (error) {
log('handleStep: failed to edit progress message: %O', error);
@@ -299,6 +310,28 @@ export class BotCallbackService {
}
}
/**
* Renew typing on the message-gateway. Each POST resets the 30s auto-stop timeout.
* Fire-and-forget — typing is best-effort.
*/
private renewGatewayTyping(connectionId: string, platformThreadId: string): void {
const client = getMessageGatewayClient();
if (!client.isConfigured) return;
client.startTyping(connectionId, platformThreadId).catch((err) => {
log('renewGatewayTyping failed: %O', err);
});
}
private stopGatewayTyping(connectionId: string, platformThreadId: string): void {
const client = getMessageGatewayClient();
if (!client.isConfigured) return;
client.stopTyping(connectionId, platformThreadId).catch((err) => {
log('stopGatewayTyping failed: %O', err);
});
}
private summarizeTopicTitle(body: BotCallbackBody, messenger: PlatformMessenger): void {
const { reason, topicId, userId, userPrompt, lastAssistantContent, threadName } = body;
if (
@@ -31,6 +31,10 @@ vi.mock('@/server/services/aiAgent', () => ({
})),
}));
vi.mock('@/server/services/gateway/MessageGatewayClient', () => ({
getMessageGatewayClient: vi.fn().mockReturnValue({ isConfigured: false }),
}));
vi.mock('@/server/services/queue/impls', () => ({
isQueueAgentRuntimeEnabled: mockIsQueueAgentRuntimeEnabled,
}));
@@ -90,7 +94,7 @@ function createClient() {
return {
createAdapter: vi.fn(),
extractChatId: vi.fn(),
getMessenger: vi.fn(),
getMessenger: vi.fn().mockReturnValue({ triggerTyping: vi.fn() }),
id: 'discord',
parseMessageId: vi.fn(),
shouldSubscribe: vi.fn().mockReturnValue(true),
@@ -74,6 +74,14 @@ vi.mock('../AgentBridgeService', () => ({
},
}));
vi.mock('@/server/services/gateway/MessageGatewayClient', () => ({
getMessageGatewayClient: vi.fn().mockReturnValue({
isConfigured: false,
startTyping: vi.fn().mockResolvedValue(undefined),
stopTyping: vi.fn().mockResolvedValue(undefined),
}),
}));
vi.mock('@/server/services/systemAgent', () => ({
SystemAgentService: vi.fn().mockImplementation(() => ({
generateTopicTitle: mockGenerateTopicTitle,
@@ -64,7 +64,6 @@ function createMessenger(
createMessage: (content) => api.sendMessage(chatId, content).then(() => {}),
editMessage: (messageId, content) => api.editMessage(messageId, content).then(() => {}),
removeReaction: () => Promise.resolve(),
triggerTyping: () => Promise.resolve(),
};
}
@@ -271,7 +271,6 @@ class QQGatewayClient implements PlatformClient {
sendQQMessage(api, threadType, targetId, content),
// QQ Bot API doesn't support reactions or typing
removeReaction: () => Promise.resolve(),
triggerTyping: () => Promise.resolve(),
};
}
@@ -369,7 +368,6 @@ class QQWebhookClient implements PlatformClient {
createMessage: (content) => sendQQMessage(api, threadType, targetId, content),
editMessage: (_messageId, content) => sendQQMessage(api, threadType, targetId, content),
removeReaction: () => Promise.resolve(),
triggerTyping: () => Promise.resolve(),
};
}
@@ -55,7 +55,6 @@ function createMessenger(config: BotProviderConfig, platformThreadId: string): P
: slack.postMessage(channelId, content).then(() => {}),
editMessage: (messageId, content) => slack.updateMessage(channelId, messageId, content),
removeReaction: (messageId, emoji) => slack.removeReaction(channelId, messageId, emoji),
triggerTyping: () => Promise.resolve(),
};
}
+1 -1
View File
@@ -80,7 +80,7 @@ export interface PlatformMessenger {
createMessage: (content: string) => Promise<void>;
editMessage: (messageId: string, content: string) => Promise<void>;
removeReaction: (messageId: string, emoji: string) => Promise<void>;
triggerTyping: () => Promise<void>;
triggerTyping?: () => Promise<void>;
updateThreadName?: (name: string) => Promise<void>;
}
@@ -0,0 +1,178 @@
import debug from 'debug';
import { gatewayEnv } from '@/envs/gateway';
const log = debug('lobe-server:message-gateway-client');
// ─── Types ───
export interface MessageGatewayConnectionConfig {
/** Platform application ID (e.g., Feishu appId, QQ appId) */
applicationId?: string;
connectionId: string;
/** Preferred connection mode (e.g., "webhook", "websocket"). Falls back to platform default if omitted. */
connectionMode?: string;
credentials: Record<string, unknown>;
platform: string;
userId: string;
webhookPath: string;
}
export interface MessageGatewayConnectionStatus {
config: { connectionId: string; platform: string } | null;
state: {
connectedAt?: number;
error?: string;
platform: string;
status: 'connected' | 'connecting' | 'disconnected' | 'error';
};
}
export interface MessageGatewayStats {
byPlatform: Record<string, number>;
connections: Array<{
connectionId: string;
platform: string;
state: { status: string };
userId: string;
}>;
total: number;
}
// ─── Client ───
/**
* HTTP client for the message-gateway Cloudflare Worker.
*
* The gateway is a pure connection proxy — it only manages persistent
* connections (WebSocket/long-polling) and forwards inbound events to
* LobeHub's webhook. Outbound messaging is NOT routed through the gateway;
* LobeHub calls platform REST APIs directly.
*/
export class MessageGatewayClient {
private baseUrl: string;
private serviceToken: string;
constructor(baseUrl?: string, serviceToken?: string) {
if (baseUrl !== undefined) {
this.baseUrl = baseUrl;
this.serviceToken = serviceToken || '';
} else {
this.baseUrl = gatewayEnv.MESSAGE_GATEWAY_URL || '';
this.serviceToken = gatewayEnv.MESSAGE_GATEWAY_SERVICE_TOKEN || '';
}
}
get isConfigured(): boolean {
return !!(this.baseUrl && this.serviceToken);
}
// ─── Connection Management ───
async connect(config: MessageGatewayConnectionConfig): Promise<{ status: string }> {
log('Connecting %s:%s (platform=%s)', config.connectionId, config.userId, config.platform);
const res = await this.post('/api/connections', { config });
if (!res.ok) {
const error = await res.text();
log('Connect failed: %s', error);
throw new Error(`message-gateway connect failed (${res.status}): ${error}`);
}
return res.json();
}
async disconnect(connectionId: string): Promise<{ status: string }> {
log('Disconnecting %s', connectionId);
const res = await this.fetch(`/api/connections/${encodeURIComponent(connectionId)}`, {
method: 'DELETE',
});
if (!res.ok) {
const error = await res.text();
log('Disconnect failed: %s', error);
throw new Error(`message-gateway disconnect failed (${res.status}): ${error}`);
}
return res.json();
}
// ─── Typing ───
async startTyping(connectionId: string, platformThreadId: string): Promise<void> {
await this.post(`/api/connections/${encodeURIComponent(connectionId)}/typing`, {
platformThreadId,
});
}
async stopTyping(connectionId: string, platformThreadId: string): Promise<void> {
await this.fetch(`/api/connections/${encodeURIComponent(connectionId)}/typing`, {
body: JSON.stringify({ platformThreadId }),
headers: { 'Content-Type': 'application/json' },
method: 'DELETE',
});
}
// ─── Status & Admin ───
async getStatus(connectionId: string): Promise<MessageGatewayConnectionStatus> {
const res = await this.fetch(`/api/connections/${encodeURIComponent(connectionId)}/status`);
if (!res.ok) {
throw new Error(`message-gateway status failed (${res.status})`);
}
return res.json();
}
async getStats(): Promise<MessageGatewayStats> {
const res = await this.fetch('/api/admin/stats');
if (!res.ok) {
throw new Error(`message-gateway stats failed (${res.status})`);
}
return res.json();
}
// ─── Internal HTTP ───
private async fetch(path: string, init?: RequestInit): Promise<Response> {
if (!this.isConfigured) {
throw new Error(
'MessageGatewayClient not configured: set MESSAGE_GATEWAY_URL and MESSAGE_GATEWAY_SERVICE_TOKEN',
);
}
const url = `${this.baseUrl}${path}`;
return globalThis.fetch(url, {
...init,
headers: {
...init?.headers,
Authorization: `Bearer ${this.serviceToken}`,
},
});
}
private async post(path: string, body: unknown): Promise<Response> {
return this.fetch(path, {
body: JSON.stringify(body),
headers: { 'Content-Type': 'application/json' },
method: 'POST',
});
}
}
// ─── Singleton ───
let _client: MessageGatewayClient | undefined;
export function getMessageGatewayClient(): MessageGatewayClient {
if (!_client) {
_client = new MessageGatewayClient();
}
return _client;
}
@@ -0,0 +1,145 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { MessageGatewayClient } from '../MessageGatewayClient';
describe('MessageGatewayClient', () => {
let client: MessageGatewayClient;
beforeEach(() => {
client = new MessageGatewayClient('https://message-gateway.test.com', 'test-service-token');
});
describe('isConfigured', () => {
it('returns true when both url and token are set', () => {
expect(client.isConfigured).toBe(true);
});
it('returns false when url is missing', () => {
const c = new MessageGatewayClient('', 'token');
expect(c.isConfigured).toBe(false);
});
it('returns false when token is missing', () => {
const c = new MessageGatewayClient('https://example.com', '');
expect(c.isConfigured).toBe(false);
});
});
describe('connect', () => {
it('calls POST /api/connections with config', async () => {
const mockFetch = vi.fn().mockResolvedValue({
json: () => Promise.resolve({ status: 'connected' }),
ok: true,
});
vi.stubGlobal('fetch', mockFetch);
const result = await client.connect({
connectionId: 'conn-1',
credentials: { botToken: 'test' },
platform: 'discord',
userId: 'user-1',
webhookPath: '/api/agent/webhooks/discord/app1',
});
expect(mockFetch).toHaveBeenCalledWith(
'https://message-gateway.test.com/api/connections',
expect.objectContaining({ method: 'POST' }),
);
expect(result.status).toBe('connected');
vi.unstubAllGlobals();
});
it('throws on non-ok response', async () => {
vi.stubGlobal(
'fetch',
vi.fn().mockResolvedValue({
ok: false,
status: 500,
text: () => Promise.resolve('Internal error'),
}),
);
await expect(
client.connect({
connectionId: 'conn-1',
credentials: {},
platform: 'discord',
userId: 'user-1',
webhookPath: '/test',
}),
).rejects.toThrow('connect failed (500)');
vi.unstubAllGlobals();
});
});
describe('disconnect', () => {
it('calls DELETE /api/connections/:id', async () => {
const mockFetch = vi.fn().mockResolvedValue({
json: () => Promise.resolve({ status: 'disconnected' }),
ok: true,
});
vi.stubGlobal('fetch', mockFetch);
await client.disconnect('conn-1');
expect(mockFetch).toHaveBeenCalledWith(
'https://message-gateway.test.com/api/connections/conn-1',
expect.objectContaining({ method: 'DELETE' }),
);
vi.unstubAllGlobals();
});
});
describe('getStatus', () => {
it('calls GET /api/connections/:id/status', async () => {
const mockFetch = vi.fn().mockResolvedValue({
json: () =>
Promise.resolve({
config: { connectionId: 'conn-1', platform: 'discord' },
state: { platform: 'discord', status: 'connected' },
}),
ok: true,
});
vi.stubGlobal('fetch', mockFetch);
const status = await client.getStatus('conn-1');
expect(status.state.status).toBe('connected');
vi.unstubAllGlobals();
});
});
describe('getStats', () => {
it('calls GET /api/admin/stats', async () => {
const mockFetch = vi.fn().mockResolvedValue({
json: () => Promise.resolve({ byPlatform: { discord: 2 }, connections: [], total: 2 }),
ok: true,
});
vi.stubGlobal('fetch', mockFetch);
const stats = await client.getStats();
expect(stats.total).toBe(2);
vi.unstubAllGlobals();
});
});
describe('unconfigured client', () => {
it('throws when calling methods without configuration', async () => {
const unconfigured = new MessageGatewayClient('', '');
await expect(
unconfigured.connect({
connectionId: 'test',
credentials: {},
platform: 'discord',
userId: 'user',
webhookPath: '/test',
}),
).rejects.toThrow('not configured');
});
});
});
+204 -2
View File
@@ -8,6 +8,7 @@ import type { ConnectionMode } from '../bot/platforms';
import { getEffectiveConnectionMode, platformRegistry } from '../bot/platforms';
import { BOT_CONNECT_QUEUE_EXPIRE_MS, BotConnectQueue } from './botConnectQueue';
import { createGatewayManager, getGatewayManager } from './GatewayManager';
import { getMessageGatewayClient } from './MessageGatewayClient';
import { BOT_RUNTIME_STATUSES, updateBotRuntimeStatus } from './runtimeStatus';
const log = debug('lobe-server:service:gateway');
@@ -15,7 +16,21 @@ const log = debug('lobe-server:service:gateway');
const isVercel = !!process.env.VERCEL_ENV;
export class GatewayService {
/**
* Check if the external message-gateway is configured.
* When enabled, all platforms are registered on the gateway for
* connection management and typing persistence.
*/
get useMessageGateway(): boolean {
return getMessageGatewayClient().isConfigured;
}
async ensureRunning(): Promise<void> {
if (this.useMessageGateway) {
await this.syncGatewayConnections();
return;
}
const existing = getGatewayManager();
if (existing?.isRunning) {
log('GatewayManager already running');
@@ -28,6 +43,91 @@ export class GatewayService {
log('GatewayManager started');
}
/**
* Sync all enabled bots to the external message-gateway.
* Called on startup to recover connections after LobeHub restarts.
*/
private async syncGatewayConnections(): Promise<void> {
const { getServerDB } = await import('@/database/core/db-adaptor');
const { AgentBotProviderModel } = await import('@/database/models/agentBotProvider');
const { KeyVaultsGateKeeper } = await import('@/server/modules/KeyVaultsEncrypt');
const client = getMessageGatewayClient();
const serverDB = await getServerDB();
const gateKeeper = await KeyVaultsGateKeeper.initWithEnvKey();
// Sync all registered platforms
for (const definition of platformRegistry.listPlatforms()) {
const platform = definition.id;
try {
const providers = await AgentBotProviderModel.findEnabledByPlatform(
serverDB,
platform,
gateKeeper,
);
log('Gateway sync: found %d enabled providers for %s', providers.length, platform);
for (const provider of providers) {
try {
const definition = platformRegistry.getPlatform(platform);
const connectionMode = getEffectiveConnectionMode(definition, provider.settings);
// Webhook-mode platforms don't need persistent gateway connections.
// Run the platform client locally via GatewayManager instead
// (e.g. Telegram setWebhook, QQ credential verification).
if (connectionMode === 'webhook') {
const manager = createGatewayManager({
definitions: platformRegistry.listPlatforms(),
});
await manager.startClient(platform, provider.applicationId, provider.userId);
log(
'Gateway sync: started webhook-mode %s:%s locally',
platform,
provider.applicationId,
);
continue;
}
// For persistent connections, check gateway status before reconnecting
try {
const status = await client.getStatus(provider.id);
if (status.state.status === 'connected') {
log('Gateway sync: %s already connected, skipping', provider.id);
continue;
}
} catch {
// Status check failed — try to connect
}
const webhookPath = `/api/agent/webhooks/${platform}/${provider.applicationId}`;
await client.connect({
applicationId: provider.applicationId,
connectionId: provider.id,
connectionMode,
credentials: provider.credentials,
platform,
userId: provider.userId,
webhookPath,
});
await updateBotRuntimeStatus({
applicationId: provider.applicationId,
platform,
status: BOT_RUNTIME_STATUSES.connected,
});
log('Gateway sync: connected %s:%s', platform, provider.applicationId);
} catch (err) {
log('Gateway sync: failed to connect %s:%s: %O', platform, provider.applicationId, err);
}
}
} catch (err) {
log('Gateway sync: error syncing platform %s: %O', platform, err);
}
}
}
async stop(): Promise<void> {
const manager = getGatewayManager();
if (!manager) return;
@@ -41,6 +141,11 @@ export class GatewayService {
applicationId: string,
userId: string,
): Promise<'started' | 'queued'> {
if (this.useMessageGateway) {
return this.startClientViaGateway(platform, applicationId, userId);
}
// ─── Legacy: in-process connection management ───
if (isVercel) {
// Load the provider so we can resolve per-provider connection mode.
// The platform default is only a fallback — Slack/Feishu (default websocket)
@@ -72,8 +177,6 @@ export class GatewayService {
return 'queued';
}
// Webhook-based platforms only need a single HTTP call,
// so we can run directly in a Vercel serverless function.
const manager = createGatewayManager({ definitions: platformRegistry.listPlatforms() });
await manager.startClient(platform, applicationId, userId);
log('Started client %s:%s (direct)', platform, applicationId);
@@ -93,6 +196,11 @@ export class GatewayService {
}
async stopClient(platform: string, applicationId: string, userId?: string): Promise<void> {
if (this.useMessageGateway) {
return this.stopClientViaGateway(platform, applicationId);
}
// ─── Legacy: in-process connection management ───
if (isVercel) {
// Without a userId we cannot resolve per-provider settings; fall back to the
// platform default to decide if a queue cleanup is even worth attempting.
@@ -127,4 +235,98 @@ export class GatewayService {
status: BOT_RUNTIME_STATUSES.disconnected,
});
}
// ─── External Message Gateway ───
private async startClientViaGateway(
platform: string,
applicationId: string,
userId: string,
): Promise<'started'> {
const client = getMessageGatewayClient();
const { getServerDB } = await import('@/database/core/db-adaptor');
const { AgentBotProviderModel } = await import('@/database/models/agentBotProvider');
const { KeyVaultsGateKeeper } = await import('@/server/modules/KeyVaultsEncrypt');
const serverDB = await getServerDB();
const gateKeeper = await KeyVaultsGateKeeper.initWithEnvKey();
const model = new AgentBotProviderModel(serverDB, userId, gateKeeper);
const provider = await model.findEnabledByApplicationId(platform, applicationId);
if (!provider) {
log('No enabled provider found for %s:%s', platform, applicationId);
throw new Error(`No enabled provider found for ${platform}:${applicationId}`);
}
const definition = platformRegistry.getPlatform(platform);
const connectionMode = getEffectiveConnectionMode(definition, provider.settings);
// Webhook-mode platforms don't need persistent gateway connections.
// Run the platform client locally via GatewayManager so each platform can
// perform its own initialization (e.g. Telegram calls setWebhook).
if (connectionMode === 'webhook') {
const manager = createGatewayManager({ definitions: platformRegistry.listPlatforms() });
await manager.startClient(platform, applicationId, userId);
log('Started webhook-mode client locally %s:%s', platform, applicationId);
return 'started';
}
const webhookPath = `/api/agent/webhooks/${platform}/${applicationId}`;
await client.connect({
applicationId: provider.applicationId,
connectionId: provider.id,
connectionMode,
credentials: provider.credentials,
platform,
userId,
webhookPath,
});
await updateBotRuntimeStatus({
applicationId,
platform,
status: BOT_RUNTIME_STATUSES.connected,
});
log('Started client via message-gateway %s:%s', platform, applicationId);
return 'started';
}
private async stopClientViaGateway(platform: string, applicationId: string): Promise<void> {
// Stop locally-managed webhook client if it exists (e.g. Telegram deleteWebhook)
const manager = getGatewayManager();
if (manager) {
await manager.stopClient(platform, applicationId);
}
const client = getMessageGatewayClient();
const { getServerDB } = await import('@/database/core/db-adaptor');
const { AgentBotProviderModel } = await import('@/database/models/agentBotProvider');
const serverDB = await getServerDB();
const provider = await AgentBotProviderModel.findByPlatformAndAppId(
serverDB,
platform,
applicationId,
);
if (provider) {
try {
await client.disconnect(provider.id);
} catch (err) {
log('Disconnect via message-gateway failed: %O', err);
}
}
await updateBotRuntimeStatus({
applicationId,
platform,
status: BOT_RUNTIME_STATUSES.disconnected,
});
log('Stopped client via message-gateway %s:%s', platform, applicationId);
}
}