Compare commits

...

1 Commits

Author SHA1 Message Date
Mauricio Siu 2da2b2dd39 refactor(queues): migrate from BullMQ to p-limit for deployment management
This commit introduces a new queue system using p-limit, addressing resource issues and improving job cancellation capabilities. Key changes include:
- Removal of Redis dependency, allowing for in-memory queue management.
- Implementation of per-server queues with ordered processing based on server concurrency settings.
- Addition of helper functions for job management and status retrieval, ensuring backward compatibility with existing API endpoints.
- Updates to database schema to support server concurrency settings.

The legacy BullMQ code has been retained for compatibility but is no longer in active use.
2025-08-29 00:08:33 -06:00
14 changed files with 7214 additions and 171 deletions
@@ -79,7 +79,7 @@ export const ShowGeneralApplication = ({ applicationId }: Props) => {
>
<Button
variant="default"
isLoading={data?.applicationStatus === "running"}
// isLoading={data?.applicationStatus === "running"}
className="flex items-center gap-1.5 group focus-visible:ring-2 focus-visible:ring-offset-2"
>
<Tooltip>
@@ -0,0 +1,2 @@
ALTER TABLE "user_temp" ADD COLUMN "serverConcurrency" integer DEFAULT 1 NOT NULL;--> statement-breakpoint
ALTER TABLE "server" ADD COLUMN "concurrency" integer DEFAULT 1 NOT NULL;
File diff suppressed because it is too large Load Diff
+7
View File
@@ -750,6 +750,13 @@
"when": 1754912062243,
"tag": "0106_purple_maggott",
"breakpoints": true
},
{
"idx": 107,
"version": "7",
"when": 1756436825081,
"tag": "0107_calm_power_pack",
"breakpoints": true
}
]
}
+1 -1
View File
@@ -97,7 +97,6 @@
"better-auth": "v1.2.8-beta.7",
"bl": "6.0.11",
"boxen": "^7.1.1",
"bullmq": "5.4.2",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^0.2.1",
@@ -126,6 +125,7 @@
"nodemailer": "6.9.14",
"octokit": "3.1.2",
"otpauth": "^9.4.0",
"p-limit": "^7.1.1",
"pino": "9.4.0",
"pino-pretty": "11.2.2",
"postgres": "3.4.4",
@@ -54,7 +54,11 @@ import {
applications,
} from "@/server/db/schema";
import type { DeploymentJob } from "@/server/queues/queue-types";
import { cleanQueuesByApplication, myQueue } from "@/server/queues/queueSetup";
import {
addJobWithUserContext,
cleanQueuesByApplication,
myQueue,
} from "@/server/queues/queueSetup";
import { deploy } from "@/server/utils/deploy";
import { uploadFileSchema } from "@/utils/schema";
@@ -668,14 +672,7 @@ export const applicationRouter = createTRPCRouter({
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
);
await addJobWithUserContext({ ...jobData }, ctx.user.id);
}),
cleanQueues: protectedProcedure
+104
View File
@@ -0,0 +1,104 @@
# Queue System Migration - BullMQ to p-limit
This directory contains the new queue system that replaces BullMQ with [p-limit](https://github.com/sindresorhus/p-limit) for deployment queues.
## Why the Migration?
- **Resource Issues**: Users experienced freezing during builds due to resource constraints
- **Cancellation Problems**: BullMQ workers couldn't be properly canceled when Docker processes restart
- **Retry Loops**: Unwanted automatic retries when processes are killed
## New Architecture
### Key Features
1. **Per-Server Queues**: Deployments are grouped by server (local "dokploy-server" or remote servers)
2. **Ordered Processing**: Within each server, deployments are processed based on server concurrency settings
3. **Global User Concurrency**: User's `serverConcurrency` controls total deployments across all servers
4. **Proper Cancellation**: Jobs can be canceled using AbortController
5. **No Redis Dependency**: In-memory queues eliminate Redis dependency issues
### Files
- `service-queue.ts` - New p-limit based queue implementation
- `queueSetup.ts` - Compatibility layer for existing code
- `deployments-queue.ts` - Legacy compatibility exports
- `queue-types.ts` - Shared type definitions
## Usage Examples
```typescript
import { addJobWithUserContext, cancelDeploymentJobs, getDeploymentQueueStatus } from './queueSetup';
// Add a deployment job with user context (recommended for API routes)
const result = await addJobWithUserContext({
applicationType: 'application',
applicationId: '123',
type: 'deploy',
titleLog: 'Deploying app',
descriptionLog: 'Starting deployment',
serverId: 'server-456' // Optional - for remote deployments
}, 'user-id-789'); // User ID for concurrency settings
// Cancel jobs for a service
const cancelled = cancelDeploymentJobs('app-123');
// Get queue status
const status = getDeploymentQueueStatus('app-123');
```
### Database-Driven Concurrency
The system now automatically reads concurrency settings from the database:
1. **Global User Concurrency**: From `users_temp.serverConcurrency` field
- Controls the **TOTAL** number of deployments that can run simultaneously for a user
- Example: If `serverConcurrency = 1`, only 1 deployment across ALL services at a time
- Example: If `serverConcurrency = 3`, maximum 3 deployments can run simultaneously across all services
2. **Server Concurrency**: From `server.concurrency` field
- Controls how many deployments can run simultaneously **on a specific server**
- Only applies when deploying to remote servers (`serverId` is present)
- Example: Server A can handle 2 concurrent deployments, Server B can handle 1
### Concurrency Hierarchy
```
User Global Limit (users_temp.serverConcurrency)
├── dokploy-server (local deployments)
│ ├── App A deployment
│ ├── App B deployment
│ └── Compose C deployment
├── remote-server-1 (server.concurrency = 2)
│ ├── App D deployment
│ └── App E deployment
└── remote-server-2 (server.concurrency = 1)
└── App F deployment
```
**Example Scenarios:**
- **User has `serverConcurrency = 1`**: Only 1 deployment total across ALL servers
- **User has `serverConcurrency = 3`**: Maximum 3 deployments simultaneously across all servers
- **Local server**: All local apps/compose share the "dokploy-server" queue
- **Remote server with `concurrency = 2`**: That server can handle up to 2 concurrent deployments
- **Queue grouping**: `app-123` and `app-456` on same server share the same queue
## Configuration
- **Global Concurrency**: Set how many services can deploy simultaneously
- **Service Concurrency**: Each service processes 1 deployment at a time (FIFO)
```typescript
import { setGlobalConcurrency } from './service-queue';
// Allow 5 services to deploy simultaneously
setGlobalConcurrency(5);
```
## Migration Notes
- The schedules app still uses BullMQ for cron/repeatable jobs (different use case)
- All existing API endpoints work unchanged due to compatibility layer
- No breaking changes to existing functionality
- Improved resource usage and cancellation capabilities
+54 -118
View File
@@ -1,122 +1,58 @@
import {
deployApplication,
deployCompose,
deployPreviewApplication,
deployRemoteApplication,
deployRemoteCompose,
deployRemotePreviewApplication,
rebuildApplication,
rebuildCompose,
rebuildRemoteApplication,
rebuildRemoteCompose,
updateApplicationStatus,
updateCompose,
updatePreviewDeployment,
} from "@dokploy/server";
import { type Job, Worker } from "bullmq";
import type { DeploymentJob } from "./queue-types";
import { redisConfig } from "./redis-connection";
// This file is kept for backward compatibility but now uses the new service-queue system
// The actual queue logic has been moved to service-queue.ts using p-limit
export const deploymentWorker = new Worker(
"deployments",
async (job: Job<DeploymentJob>) => {
try {
if (job.data.applicationType === "application") {
await updateApplicationStatus(job.data.applicationId, "running");
import { serviceQueueManager } from "./service-queue";
if (job.data.server) {
if (job.data.type === "redeploy") {
await rebuildRemoteApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "deploy") {
await deployRemoteApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
} else {
if (job.data.type === "redeploy") {
await rebuildApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "deploy") {
await deployApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
}
} else if (job.data.applicationType === "compose") {
await updateCompose(job.data.composeId, {
composeStatus: "running",
});
if (job.data.server) {
if (job.data.type === "redeploy") {
await rebuildRemoteCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "deploy") {
await deployRemoteCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
} else {
if (job.data.type === "deploy") {
await deployCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "redeploy") {
await rebuildCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
}
} else if (job.data.applicationType === "application-preview") {
await updatePreviewDeployment(job.data.previewDeploymentId, {
previewStatus: "running",
});
if (job.data.server) {
if (job.data.type === "deploy") {
await deployRemotePreviewApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
previewDeploymentId: job.data.previewDeploymentId,
});
}
} else {
if (job.data.type === "deploy") {
await deployPreviewApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
previewDeploymentId: job.data.previewDeploymentId,
});
}
}
}
} catch (error) {
console.log("Error", error);
}
// Legacy compatibility - this is no longer used but kept to avoid breaking imports
export const deploymentWorker = {
run: async () => {
console.log(
"Legacy deploymentWorker.run() called - now using service-queue system",
);
// The service queue manager starts automatically, no need to do anything
return Promise.resolve();
},
{
autorun: false,
connection: redisConfig,
close: async () => {
console.log("Legacy deploymentWorker.close() called");
return Promise.resolve();
},
);
};
// Legacy exports for backward compatibility
export const getWorkersMap = () => {
console.warn(
"getWorkersMap() is deprecated - use serviceQueueManager instead",
);
return {};
};
export const getWorker = (_serverId?: string) => {
console.warn("getWorker() is deprecated - use serviceQueueManager instead");
return undefined;
};
export const createDeploymentWorker = (defaultConcurrency = 1) => {
console.warn(
"createDeploymentWorker() is deprecated - use serviceQueueManager instead",
);
serviceQueueManager.setGlobalConcurrency(defaultConcurrency);
return deploymentWorker;
};
export const createServerDeploymentWorker = (
_serverId: string,
_concurrency = 1,
) => {
console.warn(
"createServerDeploymentWorker() is deprecated - use serviceQueueManager instead",
);
// The new system automatically creates queues per service, no need for explicit worker creation
return deploymentWorker;
};
export const removeServerDeploymentWorker = (serverId: string) => {
console.warn(
"removeServerDeploymentWorker() is deprecated - use removeServiceQueue instead",
);
serviceQueueManager.removeServiceQueue(serverId);
};
+88 -31
View File
@@ -1,44 +1,101 @@
import { Queue } from "bullmq";
import { redisConfig } from "./redis-connection";
import type { DeploymentJob } from "./queue-types";
import {
addDeploymentJob,
cancelDeploymentJobs,
getDeploymentQueueStatus,
setGlobalConcurrency,
} from "./service-queue";
const myQueue = new Queue("deployments", {
connection: redisConfig,
});
// Default queue name for local deployments
export const DEFAULT_QUEUE = "default";
process.on("SIGTERM", () => {
myQueue.close();
process.exit(0);
});
// Initialize with default concurrency of 3 services
setGlobalConcurrency(3);
myQueue.on("error", (error) => {
if ((error as any).code === "ECONNREFUSED") {
console.error(
"Make sure you have installed Redis and it is running.",
error,
);
// Helper function to determine service ID from job data
// Groups deployments by SERVER, not by individual application/compose
const getServiceId = (jobData: DeploymentJob): string => {
// If it has a serverId, group by that server
if (jobData.serverId) {
return jobData.serverId;
}
});
// For local deployments (no serverId), group all under the main Dokploy server
return "dokploy-server";
};
// Compatibility functions to replace BullMQ usage
export const myQueue = {
add: async (
_name: string,
jobData: DeploymentJob,
_options?: any,
userId?: string,
) => {
const serviceId = getServiceId(jobData);
const jobId = await addDeploymentJob(serviceId, jobData, userId);
console.log(`Added deployment job ${jobId} to service ${serviceId}`);
return { id: jobId };
},
close: () => {
console.log("Service queue manager shutdown initiated");
return Promise.resolve();
},
};
export const cleanQueuesByApplication = async (applicationId: string) => {
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
// Cancel jobs for this specific application across all servers
let totalCancelled = 0;
for (const job of jobs) {
if (job?.data?.applicationId === applicationId) {
await job.remove();
console.log(`Removed job ${job.id} for application ${applicationId}`);
}
}
// Check the local Dokploy server
const localCancelled = cancelDeploymentJobs(
"dokploy-server",
applicationId,
undefined,
);
totalCancelled += localCancelled;
// TODO: Also check remote servers if we need to track which servers have this application
// For now, we only clean from the local server queue
console.log(
`Cancelled ${totalCancelled} jobs for application ${applicationId}`,
);
return totalCancelled;
};
export const cleanQueuesByCompose = async (composeId: string) => {
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
// Cancel jobs for this specific compose across all servers
let totalCancelled = 0;
for (const job of jobs) {
if (job?.data?.composeId === composeId) {
await job.remove();
console.log(`Removed job ${job.id} for compose ${composeId}`);
}
}
// Check the local Dokploy server
const localCancelled = cancelDeploymentJobs(
"dokploy-server",
undefined,
composeId,
);
totalCancelled += localCancelled;
// TODO: Also check remote servers if we need to track which servers have this compose
// For now, we only clean from the local server queue
console.log(`Cancelled ${totalCancelled} jobs for compose ${composeId}`);
return totalCancelled;
};
export { myQueue };
// Export queue status for monitoring
export const getQueueStatus = getDeploymentQueueStatus;
// New function to add jobs with user context (for API routes)
export const addJobWithUserContext = async (
jobData: DeploymentJob,
userId?: string,
): Promise<{ id: string }> => {
const serviceId = getServiceId(jobData);
const jobId = await addDeploymentJob(serviceId, jobData, userId);
console.log(
`Added deployment job ${jobId} to service ${serviceId} with user context ${userId || "none"}`,
);
return { id: jobId };
};
@@ -1,8 +0,0 @@
import type { ConnectionOptions } from "bullmq";
export const redisConfig: ConnectionOptions = {
host:
process.env.NODE_ENV === "production"
? process.env.REDIS_HOST || "dokploy-redis"
: "127.0.0.1",
};
+500
View File
@@ -0,0 +1,500 @@
import {
deployApplication,
deployCompose,
deployPreviewApplication,
deployRemoteApplication,
deployRemoteCompose,
deployRemotePreviewApplication,
findServerById,
rebuildApplication,
rebuildCompose,
rebuildRemoteApplication,
rebuildRemoteCompose,
updateApplicationStatus,
updateCompose,
updatePreviewDeployment,
} from "@dokploy/server";
import { db } from "@dokploy/server/db";
import { users_temp } from "@dokploy/server/db/schema";
import { eq } from "drizzle-orm";
import pLimit from "p-limit";
import type { DeploymentJob } from "./queue-types";
// Types for our p-limit based queue system
export interface QueueJob {
id: string;
data: DeploymentJob;
createdAt: Date;
status: "waiting" | "processing" | "completed" | "failed" | "cancelled";
abortController: AbortController;
promise?: Promise<void>;
}
export interface ServiceQueue {
serviceId: string;
jobs: QueueJob[];
limit: ReturnType<typeof pLimit>; // p-limit instance with concurrency 1
}
// Global queue management using p-limit
class ServiceQueueManager {
private queues: Map<string, ServiceQueue> = new Map();
private globalLimit: ReturnType<typeof pLimit>;
private isShuttingDown = false;
constructor(globalConcurrency = 3) {
// Global limit controls how many services can deploy simultaneously
this.globalLimit = pLimit(globalConcurrency);
this.setupShutdownHandlers();
}
// Set global concurrency (how many services can deploy simultaneously)
setGlobalConcurrency(concurrency: number) {
this.globalLimit = pLimit(concurrency);
}
// Get concurrency settings from database
private async getConcurrencySettings(jobData: DeploymentJob): Promise<{
serviceConcurrency: number;
}> {
try {
// Default: Each service processes 1 deployment at a time (FIFO within service)
let serviceConcurrency = 1;
// If it's a server deployment, get server-specific concurrency
// This controls how many deployments can run simultaneously ON THAT SERVER
if (jobData.serverId) {
try {
const serverData = await findServerById(jobData.serverId);
serviceConcurrency = serverData.concurrency || 1;
console.log(
`Server ${jobData.serverId} can handle ${serviceConcurrency} concurrent deployments`,
);
} catch (error) {
console.warn(
`Could not get server concurrency for ${jobData.serverId}, using default: 1`,
);
}
}
return {
serviceConcurrency,
};
} catch (error) {
console.warn(
"Error getting concurrency settings, using defaults:",
error,
);
return {
serviceConcurrency: 1,
};
}
}
// Get or create a queue for a service with dynamic concurrency
private async getOrCreateQueue(
serviceId: string,
jobData?: DeploymentJob,
): Promise<ServiceQueue> {
if (!this.queues.has(serviceId)) {
let serviceConcurrency = 1; // Default
// Get concurrency from database if we have job data
if (jobData) {
const settings = await this.getConcurrencySettings(jobData);
serviceConcurrency = settings.serviceConcurrency;
}
this.queues.set(serviceId, {
serviceId,
jobs: [],
// Service concurrency from database or default to 1
limit: pLimit(serviceConcurrency),
});
console.log(
`Created queue for service ${serviceId} with concurrency: ${serviceConcurrency}`,
);
}
return this.queues.get(serviceId)!;
}
// Add a job to a service queue
async addJob(
serviceId: string,
jobData: DeploymentJob,
userId?: string,
): Promise<string> {
if (this.isShuttingDown) {
throw new Error("Queue manager is shutting down");
}
// Update global concurrency based on user settings if provided
// This controls the TOTAL number of deployments across ALL services for this user
if (userId) {
try {
const userData = await db.query.users_temp.findFirst({
where: eq(users_temp.id, userId),
});
if (userData?.serverConcurrency) {
// This is GLOBAL concurrency - total deployments across all services
this.globalLimit = pLimit(userData.serverConcurrency);
console.log(
`Set GLOBAL concurrency to ${userData.serverConcurrency} deployments total for user ${userId}`,
);
}
} catch (error) {
console.warn(
`Could not get user concurrency settings for ${userId}:`,
error,
);
}
}
const queue = await this.getOrCreateQueue(serviceId, jobData);
const jobId = `${serviceId}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const job: QueueJob = {
id: jobId,
data: jobData,
createdAt: new Date(),
status: "waiting",
abortController: new AbortController(),
};
queue.jobs.push(job);
console.log(
`Added job ${jobId} to service ${serviceId} queue. Queue length: ${queue.jobs.length}`,
);
// Start processing the job using p-limit
this.processJob(queue, job);
return jobId;
}
// Process a job using both global and service-level p-limit
private processJob(queue: ServiceQueue, job: QueueJob) {
// Use global limit to control cross-service concurrency
job.promise = this.globalLimit(() =>
// Use service limit to ensure ordered processing within service
queue.limit(async () => {
if (job.status === "cancelled" || this.isShuttingDown) {
return;
}
job.status = "processing";
console.log(`Processing job ${job.id} for service ${queue.serviceId}`);
try {
await this.executeJob(job);
job.status = "completed";
console.log(`Completed job ${job.id} for service ${queue.serviceId}`);
} catch (error) {
if (job.abortController.signal.aborted) {
job.status = "cancelled";
console.log(
`Job ${job.id} was cancelled for service ${queue.serviceId}`,
);
} else {
job.status = "failed";
console.error(
`Job ${job.id} failed for service ${queue.serviceId}:`,
error,
);
}
} finally {
// Clean up completed/failed jobs after a delay
setTimeout(() => {
queue.jobs = queue.jobs.filter((j) => j.id !== job.id);
}, 5000);
}
}),
);
}
// Remove/cancel jobs for a specific service
cancelJobsByService(
serviceId: string,
applicationId?: string,
composeId?: string,
): number {
const queue = this.queues.get(serviceId);
if (!queue) return 0;
let cancelledCount = 0;
// Cancel waiting and processing jobs
for (const job of queue.jobs) {
if (job.status === "waiting" || job.status === "processing") {
// Check if this job matches the filter criteria
const matchesApplication = applicationId
? (job.data.applicationType === "application" ||
job.data.applicationType === "application-preview") &&
job.data.applicationId === applicationId
: true;
const matchesCompose = composeId
? job.data.applicationType === "compose" &&
job.data.composeId === composeId
: true;
if (matchesApplication && matchesCompose) {
job.status = "cancelled";
job.abortController.abort();
cancelledCount++;
console.log(`Cancelled job ${job.id} for service ${serviceId}`);
}
}
}
// Remove cancelled jobs from queue immediately
queue.jobs = queue.jobs.filter((job) => job.status !== "cancelled");
return cancelledCount;
}
// Get queue status for a service
getQueueStatus(serviceId: string) {
const queue = this.queues.get(serviceId);
if (!queue) return null;
return {
serviceId,
totalJobs: queue.jobs.length,
waitingJobs: queue.jobs.filter((j) => j.status === "waiting").length,
processingJobs: queue.jobs.filter((j) => j.status === "processing")
.length,
completedJobs: queue.jobs.filter((j) => j.status === "completed").length,
failedJobs: queue.jobs.filter((j) => j.status === "failed").length,
// p-limit queue status
activeCount: queue.limit.activeCount,
pendingCount: queue.limit.pendingCount,
};
}
// Get all queues status
getAllQueuesStatus() {
const status: Record<string, any> = {};
for (const [serviceId] of this.queues) {
status[serviceId] = this.getQueueStatus(serviceId);
}
status.global = {
activeCount: this.globalLimit.activeCount,
pendingCount: this.globalLimit.pendingCount,
concurrency: this.globalLimit.concurrency,
};
return status;
}
// Clear pending jobs from a service queue using p-limit's clearQueue
clearServiceQueue(serviceId: string) {
const queue = this.queues.get(serviceId);
if (queue) {
// Cancel all waiting jobs
for (const job of queue.jobs) {
if (job.status === "waiting") {
job.status = "cancelled";
job.abortController.abort();
}
}
// Clear p-limit's internal queue
queue.limit.clearQueue();
// Remove cancelled jobs
queue.jobs = queue.jobs.filter((job) => job.status !== "cancelled");
console.log(`Cleared service queue for ${serviceId}`);
}
}
private async executeJob(job: QueueJob): Promise<void> {
const { data } = job;
// Check if job was cancelled before execution
if (job.abortController.signal.aborted) {
throw new Error("Job was cancelled");
}
try {
if (data.applicationType === "application") {
await updateApplicationStatus(data.applicationId, "running");
if (data.server) {
if (data.type === "redeploy") {
await rebuildRemoteApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
} else if (data.type === "deploy") {
await deployRemoteApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
}
} else {
if (data.type === "redeploy") {
await rebuildApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
} else if (data.type === "deploy") {
await deployApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
}
}
} else if (data.applicationType === "compose") {
await updateCompose(data.composeId, {
composeStatus: "running",
});
if (data.server) {
if (data.type === "redeploy") {
await rebuildRemoteCompose({
composeId: data.composeId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
} else if (data.type === "deploy") {
await deployRemoteCompose({
composeId: data.composeId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
}
} else {
if (data.type === "deploy") {
await deployCompose({
composeId: data.composeId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
} else if (data.type === "redeploy") {
await rebuildCompose({
composeId: data.composeId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
}
}
} else if (data.applicationType === "application-preview") {
await updatePreviewDeployment(data.previewDeploymentId, {
previewStatus: "running",
});
if (data.server) {
if (data.type === "deploy") {
await deployRemotePreviewApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
previewDeploymentId: data.previewDeploymentId,
});
}
} else {
if (data.type === "deploy") {
await deployPreviewApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
previewDeploymentId: data.previewDeploymentId,
});
}
}
}
} catch (error) {
console.log("Deployment Error", error);
throw error;
}
}
private setupShutdownHandlers() {
const gracefulShutdown = async () => {
console.log("Shutting down service queue manager...");
this.isShuttingDown = true;
// Cancel all jobs
for (const queue of this.queues.values()) {
for (const job of queue.jobs) {
job.abortController.abort();
}
// Clear p-limit queues
queue.limit.clearQueue();
}
// Clear global queue
this.globalLimit.clearQueue();
// Wait a bit for jobs to finish cancelling
await new Promise((resolve) => setTimeout(resolve, 2000));
process.exit(0);
};
process.on("SIGTERM", gracefulShutdown);
process.on("SIGINT", gracefulShutdown);
}
// Remove a specific service queue entirely
removeServiceQueue(serviceId: string) {
const queue = this.queues.get(serviceId);
if (queue) {
// Cancel all jobs in the queue
for (const job of queue.jobs) {
job.abortController.abort();
}
// Clear p-limit queue
queue.limit.clearQueue();
this.queues.delete(serviceId);
console.log(`Removed service queue for ${serviceId}`);
}
}
}
// Global instance
export const serviceQueueManager = new ServiceQueueManager();
// Helper functions to maintain compatibility with existing code
export const addDeploymentJob = async (
serviceId: string,
jobData: DeploymentJob,
userId?: string,
): Promise<string> => {
return await serviceQueueManager.addJob(serviceId, jobData, userId);
};
export const cancelDeploymentJobs = (
serviceId: string,
applicationId?: string,
composeId?: string,
): number => {
return serviceQueueManager.cancelJobsByService(
serviceId,
applicationId,
composeId,
);
};
export const getDeploymentQueueStatus = (serviceId?: string) => {
if (serviceId) {
return serviceQueueManager.getQueueStatus(serviceId);
}
return serviceQueueManager.getAllQueuesStatus();
};
export const setGlobalConcurrency = (concurrency: number) => {
serviceQueueManager.setGlobalConcurrency(concurrency);
};
export const removeServiceQueue = (serviceId: string) => {
serviceQueueManager.removeServiceQueue(serviceId);
};
export const clearServiceQueue = (serviceId: string) => {
serviceQueueManager.clearServiceQueue(serviceId);
};
+1
View File
@@ -48,6 +48,7 @@ export const server = pgTable("server", {
sshKeyId: text("sshKeyId").references(() => sshKeys.sshKeyId, {
onDelete: "set null",
}),
concurrency: integer("concurrency").notNull().default(1),
metricsConfig: jsonb("metricsConfig")
.$type<{
server: {
+1
View File
@@ -62,6 +62,7 @@ export const users_temp = pgTable("user_temp", {
// Metrics
enablePaidFeatures: boolean("enablePaidFeatures").notNull().default(false),
allowImpersonation: boolean("allowImpersonation").notNull().default(false),
serverConcurrency: integer("serverConcurrency").notNull().default(1),
metricsConfig: jsonb("metricsConfig")
.$type<{
server: {
+11 -3
View File
@@ -280,9 +280,6 @@ importers:
boxen:
specifier: ^7.1.1
version: 7.1.1
bullmq:
specifier: 5.4.2
version: 5.4.2
class-variance-authority:
specifier: ^0.7.1
version: 0.7.1
@@ -367,6 +364,9 @@ importers:
otpauth:
specifier: ^9.4.0
version: 9.4.0
p-limit:
specifier: ^7.1.1
version: 7.1.1
pino:
specifier: 9.4.0
version: 9.4.0
@@ -6412,6 +6412,10 @@ packages:
resolution: {integrity: sha512-/Eaoq+QyLSiXQ4lyYV23f14mZRQcXnxfHrN0vCai+ak9G0pp9iEQukIIZq5NccEvwRB8PUnZT0KsOoDCINS1qQ==}
engines: {node: '>=18'}
p-limit@7.1.1:
resolution: {integrity: sha512-i8PyM2JnsNChVSYWLr2BAjNoLi0BAYC+wecOnZnVV+YSNJkzP7cWmvI34dk0WArWfH9KwBHNoZI3P3MppImlIA==}
engines: {node: '>=20'}
p-locate@4.1.0:
resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==}
engines: {node: '>=8'}
@@ -13869,6 +13873,10 @@ snapshots:
dependencies:
yocto-queue: 1.2.1
p-limit@7.1.1:
dependencies:
yocto-queue: 1.2.1
p-locate@4.1.0:
dependencies:
p-limit: 2.3.0