feat(task): auto-ensure qstash schedule for task system (#14771)

*  feat(task): auto-ensure qstash schedule

chore: cleanup code

chore: cleanup code

chore: cleanup code

* chore: migrate qstash init workflow to startServer

chore: migrate qstash init workflow to startServer

* fix: set default QSTASH_URL to eu region, same as SDK

fix: set default QSTASH_URL to eu region, same as SDK
This commit is contained in:
Zhijie He
2026-06-05 02:07:03 +08:00
committed by GitHub
parent f5d78d3d28
commit 25635ddb38
+44
View File
@@ -163,6 +163,47 @@ const startGateway = async () => {
console.error('❌ Gateway: Failed to start after retries.'); console.error('❌ Gateway: Failed to start after retries.');
}; };
// Function to create QStash schedule for dispatching workflow tasks every 10 minutes
const createQstashSchedule = async () => {
const QSTASH_URL = process.env.QSTASH_URL || 'https://qstash-eu-central-1.upstash.io';
const QSTASH_TOKEN = process.env.QSTASH_TOKEN;
if (!QSTASH_TOKEN) {
console.warn('⚠️ QStash: QSTASH_TOKEN not set. Skipping schedule creation.');
return;
}
const APP_URL = process.env.APP_URL;
if (!APP_URL) {
console.warn('⚠️ QStash: APP_URL not set. Skipping schedule creation.');
return;
}
const url = `${QSTASH_URL}/v2/schedules/${APP_URL}/api/workflows/task/schedule-dispatch`;
try {
const res = await fetch(url, {
method: 'POST',
headers: {
'Authorization': `Bearer ${QSTASH_TOKEN}`,
'Content-Type': 'application/json',
'Upstash-Method': 'POST',
'Upstash-Cron': '*/10 * * * *',
'Upstash-Schedule-Id': 'lobe-task-schedule-dispatch',
},
body: JSON.stringify({}),
});
if (res.ok) {
console.log('✅ QStash: Schedule created successfully.');
} else {
console.error(`❌ QStash: Failed to create schedule. Status ${res.status}`);
}
} catch (err) {
console.error('❌ QStash: Error creating schedule:', err);
}
};
// Main function to run the server with optional proxy // Main function to run the server with optional proxy
const runServer = async () => { const runServer = async () => {
const PROXY_URL = process.env.PROXY_URL || ''; // Default empty string to avoid undefined errors const PROXY_URL = process.env.PROXY_URL || ''; // Default empty string to avoid undefined errors
@@ -204,6 +245,9 @@ const runServer = async () => {
// Start gateway in background after server is ready // Start gateway in background after server is ready
startGateway(); startGateway();
// Create QStash schedule for workflow task dispatching
createQstashSchedule();
// Run the server in either database or non-database mode // Run the server in either database or non-database mode
await runServer(); await runServer();
})(); })();