mirror of
https://github.com/dokploy/dokploy.git
synced 2026-06-14 03:19:49 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2da2b2dd39 |
@@ -79,7 +79,7 @@ export const ShowGeneralApplication = ({ applicationId }: Props) => {
|
||||
>
|
||||
<Button
|
||||
variant="default"
|
||||
isLoading={data?.applicationStatus === "running"}
|
||||
// isLoading={data?.applicationStatus === "running"}
|
||||
className="flex items-center gap-1.5 group focus-visible:ring-2 focus-visible:ring-offset-2"
|
||||
>
|
||||
<Tooltip>
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE "user_temp" ADD COLUMN "serverConcurrency" integer DEFAULT 1 NOT NULL;--> statement-breakpoint
|
||||
ALTER TABLE "server" ADD COLUMN "concurrency" integer DEFAULT 1 NOT NULL;
|
||||
File diff suppressed because it is too large
Load Diff
@@ -750,6 +750,13 @@
|
||||
"when": 1754912062243,
|
||||
"tag": "0106_purple_maggott",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 107,
|
||||
"version": "7",
|
||||
"when": 1756436825081,
|
||||
"tag": "0107_calm_power_pack",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -97,7 +97,6 @@
|
||||
"better-auth": "v1.2.8-beta.7",
|
||||
"bl": "6.0.11",
|
||||
"boxen": "^7.1.1",
|
||||
"bullmq": "5.4.2",
|
||||
"class-variance-authority": "^0.7.1",
|
||||
"clsx": "^2.1.1",
|
||||
"cmdk": "^0.2.1",
|
||||
@@ -126,6 +125,7 @@
|
||||
"nodemailer": "6.9.14",
|
||||
"octokit": "3.1.2",
|
||||
"otpauth": "^9.4.0",
|
||||
"p-limit": "^7.1.1",
|
||||
"pino": "9.4.0",
|
||||
"pino-pretty": "11.2.2",
|
||||
"postgres": "3.4.4",
|
||||
|
||||
@@ -54,7 +54,11 @@ import {
|
||||
applications,
|
||||
} from "@/server/db/schema";
|
||||
import type { DeploymentJob } from "@/server/queues/queue-types";
|
||||
import { cleanQueuesByApplication, myQueue } from "@/server/queues/queueSetup";
|
||||
import {
|
||||
addJobWithUserContext,
|
||||
cleanQueuesByApplication,
|
||||
myQueue,
|
||||
} from "@/server/queues/queueSetup";
|
||||
import { deploy } from "@/server/utils/deploy";
|
||||
import { uploadFileSchema } from "@/utils/schema";
|
||||
|
||||
@@ -668,14 +672,7 @@ export const applicationRouter = createTRPCRouter({
|
||||
|
||||
return true;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
await addJobWithUserContext({ ...jobData }, ctx.user.id);
|
||||
}),
|
||||
|
||||
cleanQueues: protectedProcedure
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
# Queue System Migration - BullMQ to p-limit
|
||||
|
||||
This directory contains the new queue system that replaces BullMQ with [p-limit](https://github.com/sindresorhus/p-limit) for deployment queues.
|
||||
|
||||
## Why the Migration?
|
||||
|
||||
- **Resource Issues**: Users experienced freezing during builds due to resource constraints
|
||||
- **Cancellation Problems**: BullMQ workers couldn't be properly canceled when Docker processes restart
|
||||
- **Retry Loops**: Unwanted automatic retries when processes are killed
|
||||
|
||||
## New Architecture
|
||||
|
||||
### Key Features
|
||||
|
||||
1. **Per-Server Queues**: Deployments are grouped by server (local "dokploy-server" or remote servers)
|
||||
2. **Ordered Processing**: Within each server, deployments are processed based on server concurrency settings
|
||||
3. **Global User Concurrency**: User's `serverConcurrency` controls total deployments across all servers
|
||||
4. **Proper Cancellation**: Jobs can be canceled using AbortController
|
||||
5. **No Redis Dependency**: In-memory queues eliminate Redis dependency issues
|
||||
|
||||
### Files
|
||||
|
||||
- `service-queue.ts` - New p-limit based queue implementation
|
||||
- `queueSetup.ts` - Compatibility layer for existing code
|
||||
- `deployments-queue.ts` - Legacy compatibility exports
|
||||
- `queue-types.ts` - Shared type definitions
|
||||
|
||||
## Usage Examples
|
||||
|
||||
```typescript
|
||||
import { addJobWithUserContext, cancelDeploymentJobs, getDeploymentQueueStatus } from './queueSetup';
|
||||
|
||||
// Add a deployment job with user context (recommended for API routes)
|
||||
const result = await addJobWithUserContext({
|
||||
applicationType: 'application',
|
||||
applicationId: '123',
|
||||
type: 'deploy',
|
||||
titleLog: 'Deploying app',
|
||||
descriptionLog: 'Starting deployment',
|
||||
serverId: 'server-456' // Optional - for remote deployments
|
||||
}, 'user-id-789'); // User ID for concurrency settings
|
||||
|
||||
// Cancel jobs for a service
|
||||
const cancelled = cancelDeploymentJobs('app-123');
|
||||
|
||||
// Get queue status
|
||||
const status = getDeploymentQueueStatus('app-123');
|
||||
```
|
||||
|
||||
### Database-Driven Concurrency
|
||||
|
||||
The system now automatically reads concurrency settings from the database:
|
||||
|
||||
1. **Global User Concurrency**: From `users_temp.serverConcurrency` field
|
||||
- Controls the **TOTAL** number of deployments that can run simultaneously for a user
|
||||
- Example: If `serverConcurrency = 1`, only 1 deployment across ALL services at a time
|
||||
- Example: If `serverConcurrency = 3`, maximum 3 deployments can run simultaneously across all services
|
||||
|
||||
2. **Server Concurrency**: From `server.concurrency` field
|
||||
- Controls how many deployments can run simultaneously **on a specific server**
|
||||
- Only applies when deploying to remote servers (`serverId` is present)
|
||||
- Example: Server A can handle 2 concurrent deployments, Server B can handle 1
|
||||
|
||||
### Concurrency Hierarchy
|
||||
|
||||
```
|
||||
User Global Limit (users_temp.serverConcurrency)
|
||||
├── dokploy-server (local deployments)
|
||||
│ ├── App A deployment
|
||||
│ ├── App B deployment
|
||||
│ └── Compose C deployment
|
||||
├── remote-server-1 (server.concurrency = 2)
|
||||
│ ├── App D deployment
|
||||
│ └── App E deployment
|
||||
└── remote-server-2 (server.concurrency = 1)
|
||||
└── App F deployment
|
||||
```
|
||||
|
||||
**Example Scenarios:**
|
||||
|
||||
- **User has `serverConcurrency = 1`**: Only 1 deployment total across ALL servers
|
||||
- **User has `serverConcurrency = 3`**: Maximum 3 deployments simultaneously across all servers
|
||||
- **Local server**: All local apps/compose share the "dokploy-server" queue
|
||||
- **Remote server with `concurrency = 2`**: That server can handle up to 2 concurrent deployments
|
||||
- **Queue grouping**: `app-123` and `app-456` on same server share the same queue
|
||||
|
||||
## Configuration
|
||||
|
||||
- **Global Concurrency**: Set how many services can deploy simultaneously
|
||||
- **Service Concurrency**: Each service processes 1 deployment at a time (FIFO)
|
||||
|
||||
```typescript
|
||||
import { setGlobalConcurrency } from './service-queue';
|
||||
|
||||
// Allow 5 services to deploy simultaneously
|
||||
setGlobalConcurrency(5);
|
||||
```
|
||||
|
||||
## Migration Notes
|
||||
|
||||
- The schedules app still uses BullMQ for cron/repeatable jobs (different use case)
|
||||
- All existing API endpoints work unchanged due to compatibility layer
|
||||
- No breaking changes to existing functionality
|
||||
- Improved resource usage and cancellation capabilities
|
||||
@@ -1,122 +1,58 @@
|
||||
import {
|
||||
deployApplication,
|
||||
deployCompose,
|
||||
deployPreviewApplication,
|
||||
deployRemoteApplication,
|
||||
deployRemoteCompose,
|
||||
deployRemotePreviewApplication,
|
||||
rebuildApplication,
|
||||
rebuildCompose,
|
||||
rebuildRemoteApplication,
|
||||
rebuildRemoteCompose,
|
||||
updateApplicationStatus,
|
||||
updateCompose,
|
||||
updatePreviewDeployment,
|
||||
} from "@dokploy/server";
|
||||
import { type Job, Worker } from "bullmq";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
import { redisConfig } from "./redis-connection";
|
||||
// This file is kept for backward compatibility but now uses the new service-queue system
|
||||
// The actual queue logic has been moved to service-queue.ts using p-limit
|
||||
|
||||
export const deploymentWorker = new Worker(
|
||||
"deployments",
|
||||
async (job: Job<DeploymentJob>) => {
|
||||
try {
|
||||
if (job.data.applicationType === "application") {
|
||||
await updateApplicationStatus(job.data.applicationId, "running");
|
||||
import { serviceQueueManager } from "./service-queue";
|
||||
|
||||
if (job.data.server) {
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildRemoteApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployRemoteApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (job.data.applicationType === "compose") {
|
||||
await updateCompose(job.data.composeId, {
|
||||
composeStatus: "running",
|
||||
});
|
||||
|
||||
if (job.data.server) {
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildRemoteCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployRemoteCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (job.data.type === "deploy") {
|
||||
await deployCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "redeploy") {
|
||||
await rebuildCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (job.data.applicationType === "application-preview") {
|
||||
await updatePreviewDeployment(job.data.previewDeploymentId, {
|
||||
previewStatus: "running",
|
||||
});
|
||||
if (job.data.server) {
|
||||
if (job.data.type === "deploy") {
|
||||
await deployRemotePreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (job.data.type === "deploy") {
|
||||
await deployPreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.log("Error", error);
|
||||
}
|
||||
// Legacy compatibility - this is no longer used but kept to avoid breaking imports
|
||||
export const deploymentWorker = {
|
||||
run: async () => {
|
||||
console.log(
|
||||
"Legacy deploymentWorker.run() called - now using service-queue system",
|
||||
);
|
||||
// The service queue manager starts automatically, no need to do anything
|
||||
return Promise.resolve();
|
||||
},
|
||||
{
|
||||
autorun: false,
|
||||
connection: redisConfig,
|
||||
close: async () => {
|
||||
console.log("Legacy deploymentWorker.close() called");
|
||||
return Promise.resolve();
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
// Legacy exports for backward compatibility
|
||||
export const getWorkersMap = () => {
|
||||
console.warn(
|
||||
"getWorkersMap() is deprecated - use serviceQueueManager instead",
|
||||
);
|
||||
return {};
|
||||
};
|
||||
|
||||
export const getWorker = (_serverId?: string) => {
|
||||
console.warn("getWorker() is deprecated - use serviceQueueManager instead");
|
||||
return undefined;
|
||||
};
|
||||
|
||||
export const createDeploymentWorker = (defaultConcurrency = 1) => {
|
||||
console.warn(
|
||||
"createDeploymentWorker() is deprecated - use serviceQueueManager instead",
|
||||
);
|
||||
serviceQueueManager.setGlobalConcurrency(defaultConcurrency);
|
||||
return deploymentWorker;
|
||||
};
|
||||
|
||||
export const createServerDeploymentWorker = (
|
||||
_serverId: string,
|
||||
_concurrency = 1,
|
||||
) => {
|
||||
console.warn(
|
||||
"createServerDeploymentWorker() is deprecated - use serviceQueueManager instead",
|
||||
);
|
||||
// The new system automatically creates queues per service, no need for explicit worker creation
|
||||
return deploymentWorker;
|
||||
};
|
||||
|
||||
export const removeServerDeploymentWorker = (serverId: string) => {
|
||||
console.warn(
|
||||
"removeServerDeploymentWorker() is deprecated - use removeServiceQueue instead",
|
||||
);
|
||||
serviceQueueManager.removeServiceQueue(serverId);
|
||||
};
|
||||
|
||||
@@ -1,44 +1,101 @@
|
||||
import { Queue } from "bullmq";
|
||||
import { redisConfig } from "./redis-connection";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
import {
|
||||
addDeploymentJob,
|
||||
cancelDeploymentJobs,
|
||||
getDeploymentQueueStatus,
|
||||
setGlobalConcurrency,
|
||||
} from "./service-queue";
|
||||
|
||||
const myQueue = new Queue("deployments", {
|
||||
connection: redisConfig,
|
||||
});
|
||||
// Default queue name for local deployments
|
||||
export const DEFAULT_QUEUE = "default";
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
myQueue.close();
|
||||
process.exit(0);
|
||||
});
|
||||
// Initialize with default concurrency of 3 services
|
||||
setGlobalConcurrency(3);
|
||||
|
||||
myQueue.on("error", (error) => {
|
||||
if ((error as any).code === "ECONNREFUSED") {
|
||||
console.error(
|
||||
"Make sure you have installed Redis and it is running.",
|
||||
error,
|
||||
);
|
||||
// Helper function to determine service ID from job data
|
||||
// Groups deployments by SERVER, not by individual application/compose
|
||||
const getServiceId = (jobData: DeploymentJob): string => {
|
||||
// If it has a serverId, group by that server
|
||||
if (jobData.serverId) {
|
||||
return jobData.serverId;
|
||||
}
|
||||
});
|
||||
|
||||
// For local deployments (no serverId), group all under the main Dokploy server
|
||||
return "dokploy-server";
|
||||
};
|
||||
|
||||
// Compatibility functions to replace BullMQ usage
|
||||
export const myQueue = {
|
||||
add: async (
|
||||
_name: string,
|
||||
jobData: DeploymentJob,
|
||||
_options?: any,
|
||||
userId?: string,
|
||||
) => {
|
||||
const serviceId = getServiceId(jobData);
|
||||
const jobId = await addDeploymentJob(serviceId, jobData, userId);
|
||||
console.log(`Added deployment job ${jobId} to service ${serviceId}`);
|
||||
return { id: jobId };
|
||||
},
|
||||
|
||||
close: () => {
|
||||
console.log("Service queue manager shutdown initiated");
|
||||
return Promise.resolve();
|
||||
},
|
||||
};
|
||||
|
||||
export const cleanQueuesByApplication = async (applicationId: string) => {
|
||||
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
|
||||
// Cancel jobs for this specific application across all servers
|
||||
let totalCancelled = 0;
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job?.data?.applicationId === applicationId) {
|
||||
await job.remove();
|
||||
console.log(`Removed job ${job.id} for application ${applicationId}`);
|
||||
}
|
||||
}
|
||||
// Check the local Dokploy server
|
||||
const localCancelled = cancelDeploymentJobs(
|
||||
"dokploy-server",
|
||||
applicationId,
|
||||
undefined,
|
||||
);
|
||||
totalCancelled += localCancelled;
|
||||
|
||||
// TODO: Also check remote servers if we need to track which servers have this application
|
||||
// For now, we only clean from the local server queue
|
||||
|
||||
console.log(
|
||||
`Cancelled ${totalCancelled} jobs for application ${applicationId}`,
|
||||
);
|
||||
return totalCancelled;
|
||||
};
|
||||
|
||||
export const cleanQueuesByCompose = async (composeId: string) => {
|
||||
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
|
||||
// Cancel jobs for this specific compose across all servers
|
||||
let totalCancelled = 0;
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job?.data?.composeId === composeId) {
|
||||
await job.remove();
|
||||
console.log(`Removed job ${job.id} for compose ${composeId}`);
|
||||
}
|
||||
}
|
||||
// Check the local Dokploy server
|
||||
const localCancelled = cancelDeploymentJobs(
|
||||
"dokploy-server",
|
||||
undefined,
|
||||
composeId,
|
||||
);
|
||||
totalCancelled += localCancelled;
|
||||
|
||||
// TODO: Also check remote servers if we need to track which servers have this compose
|
||||
// For now, we only clean from the local server queue
|
||||
|
||||
console.log(`Cancelled ${totalCancelled} jobs for compose ${composeId}`);
|
||||
return totalCancelled;
|
||||
};
|
||||
|
||||
export { myQueue };
|
||||
// Export queue status for monitoring
|
||||
export const getQueueStatus = getDeploymentQueueStatus;
|
||||
|
||||
// New function to add jobs with user context (for API routes)
|
||||
export const addJobWithUserContext = async (
|
||||
jobData: DeploymentJob,
|
||||
userId?: string,
|
||||
): Promise<{ id: string }> => {
|
||||
const serviceId = getServiceId(jobData);
|
||||
const jobId = await addDeploymentJob(serviceId, jobData, userId);
|
||||
console.log(
|
||||
`Added deployment job ${jobId} to service ${serviceId} with user context ${userId || "none"}`,
|
||||
);
|
||||
return { id: jobId };
|
||||
};
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
import type { ConnectionOptions } from "bullmq";
|
||||
|
||||
export const redisConfig: ConnectionOptions = {
|
||||
host:
|
||||
process.env.NODE_ENV === "production"
|
||||
? process.env.REDIS_HOST || "dokploy-redis"
|
||||
: "127.0.0.1",
|
||||
};
|
||||
@@ -0,0 +1,500 @@
|
||||
import {
|
||||
deployApplication,
|
||||
deployCompose,
|
||||
deployPreviewApplication,
|
||||
deployRemoteApplication,
|
||||
deployRemoteCompose,
|
||||
deployRemotePreviewApplication,
|
||||
findServerById,
|
||||
rebuildApplication,
|
||||
rebuildCompose,
|
||||
rebuildRemoteApplication,
|
||||
rebuildRemoteCompose,
|
||||
updateApplicationStatus,
|
||||
updateCompose,
|
||||
updatePreviewDeployment,
|
||||
} from "@dokploy/server";
|
||||
import { db } from "@dokploy/server/db";
|
||||
import { users_temp } from "@dokploy/server/db/schema";
|
||||
import { eq } from "drizzle-orm";
|
||||
import pLimit from "p-limit";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
|
||||
// Types for our p-limit based queue system
|
||||
export interface QueueJob {
|
||||
id: string;
|
||||
data: DeploymentJob;
|
||||
createdAt: Date;
|
||||
status: "waiting" | "processing" | "completed" | "failed" | "cancelled";
|
||||
abortController: AbortController;
|
||||
promise?: Promise<void>;
|
||||
}
|
||||
|
||||
export interface ServiceQueue {
|
||||
serviceId: string;
|
||||
jobs: QueueJob[];
|
||||
limit: ReturnType<typeof pLimit>; // p-limit instance with concurrency 1
|
||||
}
|
||||
|
||||
// Global queue management using p-limit
|
||||
class ServiceQueueManager {
|
||||
private queues: Map<string, ServiceQueue> = new Map();
|
||||
private globalLimit: ReturnType<typeof pLimit>;
|
||||
private isShuttingDown = false;
|
||||
|
||||
constructor(globalConcurrency = 3) {
|
||||
// Global limit controls how many services can deploy simultaneously
|
||||
this.globalLimit = pLimit(globalConcurrency);
|
||||
this.setupShutdownHandlers();
|
||||
}
|
||||
|
||||
// Set global concurrency (how many services can deploy simultaneously)
|
||||
setGlobalConcurrency(concurrency: number) {
|
||||
this.globalLimit = pLimit(concurrency);
|
||||
}
|
||||
|
||||
// Get concurrency settings from database
|
||||
private async getConcurrencySettings(jobData: DeploymentJob): Promise<{
|
||||
serviceConcurrency: number;
|
||||
}> {
|
||||
try {
|
||||
// Default: Each service processes 1 deployment at a time (FIFO within service)
|
||||
let serviceConcurrency = 1;
|
||||
|
||||
// If it's a server deployment, get server-specific concurrency
|
||||
// This controls how many deployments can run simultaneously ON THAT SERVER
|
||||
if (jobData.serverId) {
|
||||
try {
|
||||
const serverData = await findServerById(jobData.serverId);
|
||||
serviceConcurrency = serverData.concurrency || 1;
|
||||
console.log(
|
||||
`Server ${jobData.serverId} can handle ${serviceConcurrency} concurrent deployments`,
|
||||
);
|
||||
} catch (error) {
|
||||
console.warn(
|
||||
`Could not get server concurrency for ${jobData.serverId}, using default: 1`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
serviceConcurrency,
|
||||
};
|
||||
} catch (error) {
|
||||
console.warn(
|
||||
"Error getting concurrency settings, using defaults:",
|
||||
error,
|
||||
);
|
||||
return {
|
||||
serviceConcurrency: 1,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Get or create a queue for a service with dynamic concurrency
|
||||
private async getOrCreateQueue(
|
||||
serviceId: string,
|
||||
jobData?: DeploymentJob,
|
||||
): Promise<ServiceQueue> {
|
||||
if (!this.queues.has(serviceId)) {
|
||||
let serviceConcurrency = 1; // Default
|
||||
|
||||
// Get concurrency from database if we have job data
|
||||
if (jobData) {
|
||||
const settings = await this.getConcurrencySettings(jobData);
|
||||
serviceConcurrency = settings.serviceConcurrency;
|
||||
}
|
||||
|
||||
this.queues.set(serviceId, {
|
||||
serviceId,
|
||||
jobs: [],
|
||||
// Service concurrency from database or default to 1
|
||||
limit: pLimit(serviceConcurrency),
|
||||
});
|
||||
|
||||
console.log(
|
||||
`Created queue for service ${serviceId} with concurrency: ${serviceConcurrency}`,
|
||||
);
|
||||
}
|
||||
return this.queues.get(serviceId)!;
|
||||
}
|
||||
|
||||
// Add a job to a service queue
|
||||
async addJob(
|
||||
serviceId: string,
|
||||
jobData: DeploymentJob,
|
||||
userId?: string,
|
||||
): Promise<string> {
|
||||
if (this.isShuttingDown) {
|
||||
throw new Error("Queue manager is shutting down");
|
||||
}
|
||||
|
||||
// Update global concurrency based on user settings if provided
|
||||
// This controls the TOTAL number of deployments across ALL services for this user
|
||||
if (userId) {
|
||||
try {
|
||||
const userData = await db.query.users_temp.findFirst({
|
||||
where: eq(users_temp.id, userId),
|
||||
});
|
||||
|
||||
if (userData?.serverConcurrency) {
|
||||
// This is GLOBAL concurrency - total deployments across all services
|
||||
this.globalLimit = pLimit(userData.serverConcurrency);
|
||||
console.log(
|
||||
`Set GLOBAL concurrency to ${userData.serverConcurrency} deployments total for user ${userId}`,
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn(
|
||||
`Could not get user concurrency settings for ${userId}:`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const queue = await this.getOrCreateQueue(serviceId, jobData);
|
||||
const jobId = `${serviceId}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||
|
||||
const job: QueueJob = {
|
||||
id: jobId,
|
||||
data: jobData,
|
||||
createdAt: new Date(),
|
||||
status: "waiting",
|
||||
abortController: new AbortController(),
|
||||
};
|
||||
|
||||
queue.jobs.push(job);
|
||||
console.log(
|
||||
`Added job ${jobId} to service ${serviceId} queue. Queue length: ${queue.jobs.length}`,
|
||||
);
|
||||
|
||||
// Start processing the job using p-limit
|
||||
this.processJob(queue, job);
|
||||
|
||||
return jobId;
|
||||
}
|
||||
|
||||
// Process a job using both global and service-level p-limit
|
||||
private processJob(queue: ServiceQueue, job: QueueJob) {
|
||||
// Use global limit to control cross-service concurrency
|
||||
job.promise = this.globalLimit(() =>
|
||||
// Use service limit to ensure ordered processing within service
|
||||
queue.limit(async () => {
|
||||
if (job.status === "cancelled" || this.isShuttingDown) {
|
||||
return;
|
||||
}
|
||||
|
||||
job.status = "processing";
|
||||
console.log(`Processing job ${job.id} for service ${queue.serviceId}`);
|
||||
|
||||
try {
|
||||
await this.executeJob(job);
|
||||
job.status = "completed";
|
||||
console.log(`Completed job ${job.id} for service ${queue.serviceId}`);
|
||||
} catch (error) {
|
||||
if (job.abortController.signal.aborted) {
|
||||
job.status = "cancelled";
|
||||
console.log(
|
||||
`Job ${job.id} was cancelled for service ${queue.serviceId}`,
|
||||
);
|
||||
} else {
|
||||
job.status = "failed";
|
||||
console.error(
|
||||
`Job ${job.id} failed for service ${queue.serviceId}:`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
// Clean up completed/failed jobs after a delay
|
||||
setTimeout(() => {
|
||||
queue.jobs = queue.jobs.filter((j) => j.id !== job.id);
|
||||
}, 5000);
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// Remove/cancel jobs for a specific service
|
||||
cancelJobsByService(
|
||||
serviceId: string,
|
||||
applicationId?: string,
|
||||
composeId?: string,
|
||||
): number {
|
||||
const queue = this.queues.get(serviceId);
|
||||
if (!queue) return 0;
|
||||
|
||||
let cancelledCount = 0;
|
||||
|
||||
// Cancel waiting and processing jobs
|
||||
for (const job of queue.jobs) {
|
||||
if (job.status === "waiting" || job.status === "processing") {
|
||||
// Check if this job matches the filter criteria
|
||||
const matchesApplication = applicationId
|
||||
? (job.data.applicationType === "application" ||
|
||||
job.data.applicationType === "application-preview") &&
|
||||
job.data.applicationId === applicationId
|
||||
: true;
|
||||
const matchesCompose = composeId
|
||||
? job.data.applicationType === "compose" &&
|
||||
job.data.composeId === composeId
|
||||
: true;
|
||||
|
||||
if (matchesApplication && matchesCompose) {
|
||||
job.status = "cancelled";
|
||||
job.abortController.abort();
|
||||
cancelledCount++;
|
||||
console.log(`Cancelled job ${job.id} for service ${serviceId}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove cancelled jobs from queue immediately
|
||||
queue.jobs = queue.jobs.filter((job) => job.status !== "cancelled");
|
||||
|
||||
return cancelledCount;
|
||||
}
|
||||
|
||||
// Get queue status for a service
|
||||
getQueueStatus(serviceId: string) {
|
||||
const queue = this.queues.get(serviceId);
|
||||
if (!queue) return null;
|
||||
|
||||
return {
|
||||
serviceId,
|
||||
totalJobs: queue.jobs.length,
|
||||
waitingJobs: queue.jobs.filter((j) => j.status === "waiting").length,
|
||||
processingJobs: queue.jobs.filter((j) => j.status === "processing")
|
||||
.length,
|
||||
completedJobs: queue.jobs.filter((j) => j.status === "completed").length,
|
||||
failedJobs: queue.jobs.filter((j) => j.status === "failed").length,
|
||||
// p-limit queue status
|
||||
activeCount: queue.limit.activeCount,
|
||||
pendingCount: queue.limit.pendingCount,
|
||||
};
|
||||
}
|
||||
|
||||
// Get all queues status
|
||||
getAllQueuesStatus() {
|
||||
const status: Record<string, any> = {};
|
||||
for (const [serviceId] of this.queues) {
|
||||
status[serviceId] = this.getQueueStatus(serviceId);
|
||||
}
|
||||
status.global = {
|
||||
activeCount: this.globalLimit.activeCount,
|
||||
pendingCount: this.globalLimit.pendingCount,
|
||||
concurrency: this.globalLimit.concurrency,
|
||||
};
|
||||
return status;
|
||||
}
|
||||
|
||||
// Clear pending jobs from a service queue using p-limit's clearQueue
|
||||
clearServiceQueue(serviceId: string) {
|
||||
const queue = this.queues.get(serviceId);
|
||||
if (queue) {
|
||||
// Cancel all waiting jobs
|
||||
for (const job of queue.jobs) {
|
||||
if (job.status === "waiting") {
|
||||
job.status = "cancelled";
|
||||
job.abortController.abort();
|
||||
}
|
||||
}
|
||||
|
||||
// Clear p-limit's internal queue
|
||||
queue.limit.clearQueue();
|
||||
|
||||
// Remove cancelled jobs
|
||||
queue.jobs = queue.jobs.filter((job) => job.status !== "cancelled");
|
||||
|
||||
console.log(`Cleared service queue for ${serviceId}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async executeJob(job: QueueJob): Promise<void> {
|
||||
const { data } = job;
|
||||
|
||||
// Check if job was cancelled before execution
|
||||
if (job.abortController.signal.aborted) {
|
||||
throw new Error("Job was cancelled");
|
||||
}
|
||||
|
||||
try {
|
||||
if (data.applicationType === "application") {
|
||||
await updateApplicationStatus(data.applicationId, "running");
|
||||
|
||||
if (data.server) {
|
||||
if (data.type === "redeploy") {
|
||||
await rebuildRemoteApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
} else if (data.type === "deploy") {
|
||||
await deployRemoteApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (data.type === "redeploy") {
|
||||
await rebuildApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
} else if (data.type === "deploy") {
|
||||
await deployApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (data.applicationType === "compose") {
|
||||
await updateCompose(data.composeId, {
|
||||
composeStatus: "running",
|
||||
});
|
||||
|
||||
if (data.server) {
|
||||
if (data.type === "redeploy") {
|
||||
await rebuildRemoteCompose({
|
||||
composeId: data.composeId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
} else if (data.type === "deploy") {
|
||||
await deployRemoteCompose({
|
||||
composeId: data.composeId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (data.type === "deploy") {
|
||||
await deployCompose({
|
||||
composeId: data.composeId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
} else if (data.type === "redeploy") {
|
||||
await rebuildCompose({
|
||||
composeId: data.composeId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (data.applicationType === "application-preview") {
|
||||
await updatePreviewDeployment(data.previewDeploymentId, {
|
||||
previewStatus: "running",
|
||||
});
|
||||
if (data.server) {
|
||||
if (data.type === "deploy") {
|
||||
await deployRemotePreviewApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
previewDeploymentId: data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (data.type === "deploy") {
|
||||
await deployPreviewApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
previewDeploymentId: data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.log("Deployment Error", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private setupShutdownHandlers() {
|
||||
const gracefulShutdown = async () => {
|
||||
console.log("Shutting down service queue manager...");
|
||||
this.isShuttingDown = true;
|
||||
|
||||
// Cancel all jobs
|
||||
for (const queue of this.queues.values()) {
|
||||
for (const job of queue.jobs) {
|
||||
job.abortController.abort();
|
||||
}
|
||||
// Clear p-limit queues
|
||||
queue.limit.clearQueue();
|
||||
}
|
||||
|
||||
// Clear global queue
|
||||
this.globalLimit.clearQueue();
|
||||
|
||||
// Wait a bit for jobs to finish cancelling
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
process.on("SIGTERM", gracefulShutdown);
|
||||
process.on("SIGINT", gracefulShutdown);
|
||||
}
|
||||
|
||||
// Remove a specific service queue entirely
|
||||
removeServiceQueue(serviceId: string) {
|
||||
const queue = this.queues.get(serviceId);
|
||||
if (queue) {
|
||||
// Cancel all jobs in the queue
|
||||
for (const job of queue.jobs) {
|
||||
job.abortController.abort();
|
||||
}
|
||||
// Clear p-limit queue
|
||||
queue.limit.clearQueue();
|
||||
this.queues.delete(serviceId);
|
||||
console.log(`Removed service queue for ${serviceId}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Global instance
|
||||
export const serviceQueueManager = new ServiceQueueManager();
|
||||
|
||||
// Helper functions to maintain compatibility with existing code
|
||||
export const addDeploymentJob = async (
|
||||
serviceId: string,
|
||||
jobData: DeploymentJob,
|
||||
userId?: string,
|
||||
): Promise<string> => {
|
||||
return await serviceQueueManager.addJob(serviceId, jobData, userId);
|
||||
};
|
||||
|
||||
export const cancelDeploymentJobs = (
|
||||
serviceId: string,
|
||||
applicationId?: string,
|
||||
composeId?: string,
|
||||
): number => {
|
||||
return serviceQueueManager.cancelJobsByService(
|
||||
serviceId,
|
||||
applicationId,
|
||||
composeId,
|
||||
);
|
||||
};
|
||||
|
||||
export const getDeploymentQueueStatus = (serviceId?: string) => {
|
||||
if (serviceId) {
|
||||
return serviceQueueManager.getQueueStatus(serviceId);
|
||||
}
|
||||
return serviceQueueManager.getAllQueuesStatus();
|
||||
};
|
||||
|
||||
export const setGlobalConcurrency = (concurrency: number) => {
|
||||
serviceQueueManager.setGlobalConcurrency(concurrency);
|
||||
};
|
||||
|
||||
export const removeServiceQueue = (serviceId: string) => {
|
||||
serviceQueueManager.removeServiceQueue(serviceId);
|
||||
};
|
||||
|
||||
export const clearServiceQueue = (serviceId: string) => {
|
||||
serviceQueueManager.clearServiceQueue(serviceId);
|
||||
};
|
||||
@@ -48,6 +48,7 @@ export const server = pgTable("server", {
|
||||
sshKeyId: text("sshKeyId").references(() => sshKeys.sshKeyId, {
|
||||
onDelete: "set null",
|
||||
}),
|
||||
concurrency: integer("concurrency").notNull().default(1),
|
||||
metricsConfig: jsonb("metricsConfig")
|
||||
.$type<{
|
||||
server: {
|
||||
|
||||
@@ -62,6 +62,7 @@ export const users_temp = pgTable("user_temp", {
|
||||
// Metrics
|
||||
enablePaidFeatures: boolean("enablePaidFeatures").notNull().default(false),
|
||||
allowImpersonation: boolean("allowImpersonation").notNull().default(false),
|
||||
serverConcurrency: integer("serverConcurrency").notNull().default(1),
|
||||
metricsConfig: jsonb("metricsConfig")
|
||||
.$type<{
|
||||
server: {
|
||||
|
||||
Generated
+11
-3
@@ -280,9 +280,6 @@ importers:
|
||||
boxen:
|
||||
specifier: ^7.1.1
|
||||
version: 7.1.1
|
||||
bullmq:
|
||||
specifier: 5.4.2
|
||||
version: 5.4.2
|
||||
class-variance-authority:
|
||||
specifier: ^0.7.1
|
||||
version: 0.7.1
|
||||
@@ -367,6 +364,9 @@ importers:
|
||||
otpauth:
|
||||
specifier: ^9.4.0
|
||||
version: 9.4.0
|
||||
p-limit:
|
||||
specifier: ^7.1.1
|
||||
version: 7.1.1
|
||||
pino:
|
||||
specifier: 9.4.0
|
||||
version: 9.4.0
|
||||
@@ -6412,6 +6412,10 @@ packages:
|
||||
resolution: {integrity: sha512-/Eaoq+QyLSiXQ4lyYV23f14mZRQcXnxfHrN0vCai+ak9G0pp9iEQukIIZq5NccEvwRB8PUnZT0KsOoDCINS1qQ==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
p-limit@7.1.1:
|
||||
resolution: {integrity: sha512-i8PyM2JnsNChVSYWLr2BAjNoLi0BAYC+wecOnZnVV+YSNJkzP7cWmvI34dk0WArWfH9KwBHNoZI3P3MppImlIA==}
|
||||
engines: {node: '>=20'}
|
||||
|
||||
p-locate@4.1.0:
|
||||
resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==}
|
||||
engines: {node: '>=8'}
|
||||
@@ -13869,6 +13873,10 @@ snapshots:
|
||||
dependencies:
|
||||
yocto-queue: 1.2.1
|
||||
|
||||
p-limit@7.1.1:
|
||||
dependencies:
|
||||
yocto-queue: 1.2.1
|
||||
|
||||
p-locate@4.1.0:
|
||||
dependencies:
|
||||
p-limit: 2.3.0
|
||||
|
||||
Reference in New Issue
Block a user