Compare commits

...

2 Commits

Author SHA1 Message Date
Mauricio Siu 1379b2118f Merge branch 'canary' into feat/add-concurrent-builds 2025-11-17 22:05:31 -06:00
Mauricio Siu 794cd79973 feat: add comprehensive testing for grouped queue and queue manager functionality
- Introduced tests for the GroupedQueue class, covering basic functionality, concurrency handling, and job processing across multiple groups.
- Added tests for the QueueManager class, ensuring correct queue creation, job management, and handler functionality.
- Implemented tests for concurrency changes and their effects on pending tasks, enhancing overall test coverage for the queue system.
- Created a new ChangeConcurrencyModal component for adjusting deployment concurrency settings in the UI.
2025-11-15 17:31:52 -06:00
19 changed files with 2180 additions and 172 deletions
@@ -0,0 +1,809 @@
import { describe, expect, it } from "vitest";
import { GroupedQueue } from "../../server/queues/grouped-queue-wrapper";
describe("GroupedQueue", () => {
describe("Basic functionality", () => {
it("should process a single job with concurrency 1", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 50));
processed.push(data.id);
});
await queue.add("group1", { id: "job1" });
// Wait for processing to complete
await new Promise((resolve) => setTimeout(resolve, 100));
expect(processed).toEqual(["job1"]);
expect(queue.isIdle()).toBe(true);
});
it("should process jobs in FIFO order within a group", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 20));
processed.push(data.id);
});
// Add multiple jobs to the same group
await Promise.all([
queue.add("group1", { id: "job1" }),
queue.add("group1", { id: "job2" }),
queue.add("group1", { id: "job3" }),
]);
// Wait for all processing
await new Promise((resolve) => setTimeout(resolve, 200));
expect(processed).toEqual(["job1", "job2", "job3"]);
});
});
describe("Concurrency 1 with multiple groups", () => {
it("should process one group at a time with concurrency 1", async () => {
const queue = new GroupedQueue<{ id: string; group: string }>(1);
const processed: string[] = [];
const activeGroups: string[] = [];
queue.setHandler(async (data) => {
activeGroups.push(data.group);
await new Promise((resolve) => setTimeout(resolve, 50));
processed.push(data.id);
activeGroups.pop();
});
// Add jobs to 3 different groups
const promises = [
queue.add("app1", { id: "job1", group: "app1" }),
queue.add("app2", { id: "job2", group: "app2" }),
queue.add("app3", { id: "job3", group: "app3" }),
];
// Check after 30ms - only one should be processing
await new Promise((resolve) => setTimeout(resolve, 30));
expect(activeGroups.length).toBeLessThanOrEqual(1);
// Wait for all to complete
await Promise.all(promises);
await new Promise((resolve) => setTimeout(resolve, 200));
expect(processed).toHaveLength(3);
expect(queue.isIdle()).toBe(true);
});
it("should process groups sequentially with concurrency 1", async () => {
const queue = new GroupedQueue<{ id: string; group: string }>(1);
const processingOrder: string[] = [];
const startTimes: Map<string, number> = new Map();
const endTimes: Map<string, number> = new Map();
queue.setHandler(async (data) => {
startTimes.set(data.id, Date.now());
processingOrder.push(`start-${data.group}`);
await new Promise((resolve) => setTimeout(resolve, 50));
endTimes.set(data.id, Date.now());
processingOrder.push(`end-${data.group}`);
});
await Promise.all([
queue.add("app1", { id: "job1", group: "app1" }),
queue.add("app2", { id: "job2", group: "app2" }),
queue.add("app3", { id: "job3", group: "app3" }),
]);
await new Promise((resolve) => setTimeout(resolve, 300));
// Verify sequential processing
expect(processingOrder).toEqual([
"start-app1",
"end-app1",
"start-app2",
"end-app2",
"start-app3",
"end-app3",
]);
// Verify jobs don't overlap
const job1End = endTimes.get("job1")!;
const job2Start = startTimes.get("job2")!;
const job2End = endTimes.get("job2")!;
const job3Start = startTimes.get("job3")!;
expect(job2Start).toBeGreaterThanOrEqual(job1End);
expect(job3Start).toBeGreaterThanOrEqual(job2End);
});
});
describe("Concurrency 3 with 4 groups", () => {
it("should process up to 3 groups simultaneously", async () => {
const queue = new GroupedQueue<{ id: string; group: string }>(3);
const activeGroups = new Set<string>();
const maxConcurrent = { value: 0 };
queue.setHandler(async (data) => {
activeGroups.add(data.group);
maxConcurrent.value = Math.max(maxConcurrent.value, activeGroups.size);
await new Promise((resolve) => setTimeout(resolve, 100));
activeGroups.delete(data.group);
});
// Add 4 jobs to different groups
await Promise.all([
queue.add("app1", { id: "job1", group: "app1" }),
queue.add("app2", { id: "job2", group: "app2" }),
queue.add("app3", { id: "job3", group: "app3" }),
queue.add("app4", { id: "job4", group: "app4" }),
]);
// Check during processing
await new Promise((resolve) => setTimeout(resolve, 50));
// Should have processed 3 groups simultaneously
expect(maxConcurrent.value).toBe(3);
expect(activeGroups.size).toBeLessThanOrEqual(3);
// Wait for all to complete
await new Promise((resolve) => setTimeout(resolve, 200));
expect(queue.isIdle()).toBe(true);
});
it("should process 4th group after one of the first 3 completes", async () => {
const queue = new GroupedQueue<{ id: string; group: string }>(3);
const processingOrder: string[] = [];
queue.setHandler(async (data) => {
processingOrder.push(`start-${data.group}`);
await new Promise((resolve) => setTimeout(resolve, 100));
processingOrder.push(`end-${data.group}`);
});
await Promise.all([
queue.add("app1", { id: "job1", group: "app1" }),
queue.add("app2", { id: "job2", group: "app2" }),
queue.add("app3", { id: "job3", group: "app3" }),
queue.add("app4", { id: "job4", group: "app4" }),
]);
await new Promise((resolve) => setTimeout(resolve, 250));
// First 3 should start together
const firstThree = processingOrder.slice(0, 3);
expect(firstThree).toContain("start-app1");
expect(firstThree).toContain("start-app2");
expect(firstThree).toContain("start-app3");
// 4th should start after one completes
const app4StartIndex = processingOrder.indexOf("start-app4");
expect(app4StartIndex).toBeGreaterThan(0);
expect(app4StartIndex).toBeLessThan(processingOrder.length - 1);
});
});
describe("Multiple jobs per group", () => {
it("should process jobs sequentially within same group", async () => {
const queue = new GroupedQueue<{ id: string }>(3);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 30));
processed.push(data.id);
});
// Add 3 jobs to same group
await Promise.all([
queue.add("app1", { id: "job1" }),
queue.add("app1", { id: "job2" }),
queue.add("app1", { id: "job3" }),
]);
await new Promise((resolve) => setTimeout(resolve, 200));
// Should process in order
expect(processed).toEqual(["job1", "job2", "job3"]);
});
it("should process multiple groups with multiple jobs each", async () => {
const queue = new GroupedQueue<{ id: string; group: string }>(2);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 20));
processed.push(`${data.group}-${data.id}`);
});
// Add jobs to 2 groups, 2 jobs each
await Promise.all([
queue.add("app1", { id: "job1", group: "app1" }),
queue.add("app1", { id: "job2", group: "app1" }),
queue.add("app2", { id: "job1", group: "app2" }),
queue.add("app2", { id: "job2", group: "app2" }),
]);
await new Promise((resolve) => setTimeout(resolve, 200));
// Should process both groups, jobs within each group in order
expect(processed).toHaveLength(4);
expect(processed.filter((p) => p.startsWith("app1"))).toEqual([
"app1-job1",
"app1-job2",
]);
expect(processed.filter((p) => p.startsWith("app2"))).toEqual([
"app2-job1",
"app2-job2",
]);
});
});
describe("Error handling", () => {
it("should reject job on handler error", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
queue.setHandler(async () => {
throw new Error("Test error");
});
await expect(queue.add("group1", { id: "job1" })).rejects.toThrow(
"Test error",
);
});
it("should continue processing other jobs after error", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
if (data.id === "job2") {
throw new Error("Job 2 error");
}
processed.push(data.id);
});
await expect(
queue.add("group1", { id: "job1" }),
).resolves.toBeUndefined();
await expect(queue.add("group1", { id: "job2" })).rejects.toThrow();
await expect(
queue.add("group1", { id: "job3" }),
).resolves.toBeUndefined();
await new Promise((resolve) => setTimeout(resolve, 100));
expect(processed).toEqual(["job1", "job3"]);
});
});
describe("Queue management", () => {
it("should clear group tasks", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 50));
processed.push(data.id);
});
// Add jobs without awaiting - they'll start processing
const job1Promise = queue.add("group1", { id: "job1" });
const job2Promise = queue.add("group1", { id: "job2" });
// Clear immediately - job1 might be processing, but job2 should be cleared
queue.clearGroup("group1");
// Use Promise.allSettled to handle both promises properly
const results = await Promise.allSettled([job1Promise, job2Promise]);
// job1 might succeed or fail depending on timing
// job2 should be rejected
const job2Result = results[1];
if (job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe("Queue cleared");
}
await new Promise((resolve) => setTimeout(resolve, 100));
// Job1 might have processed, but job2 should not
expect(processed.length).toBeLessThanOrEqual(1);
});
it("should return correct group length", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
queue.setHandler(async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
});
// Add jobs without awaiting - check length immediately
const promises = [
queue.add("group1", { id: "job1" }),
queue.add("group1", { id: "job2" }),
queue.add("group1", { id: "job3" }),
];
// Check length immediately - at least some should be pending
// (job1 might be processing, but job2 and job3 should be pending)
const length = queue.getGroupLength("group1");
expect(length).toBeGreaterThanOrEqual(0);
// Wait for all to complete
await Promise.all(promises);
await new Promise((resolve) => setTimeout(resolve, 50));
// After processing should be 0
expect(queue.getGroupLength("group1")).toBe(0);
});
it("should close queue and reject pending tasks", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
queue.setHandler(async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
});
// Add first job and wait a bit to ensure it starts processing
const job1Promise = queue.add("group1", { id: "job1" });
// Add second job without awaiting
const job2Promise = queue.add("group1", { id: "job2" });
// Wait a tiny bit to ensure job2 is queued
await new Promise((resolve) => setTimeout(resolve, 10));
// Close queue - job2 should be rejected
await queue.close();
// Use Promise.allSettled to handle both promises properly
const results = await Promise.allSettled([job1Promise, job2Promise]);
// job1 might succeed or fail depending on timing
// job2 should be rejected
const job2Result = results[1];
if (job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe("Queue closed");
}
});
});
describe("Concurrency edge cases", () => {
it("should handle concurrency 1 with 1 app correctly", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 50));
processed.push(data.id);
});
await queue.add("app1", { id: "job1" });
await new Promise((resolve) => setTimeout(resolve, 100));
expect(processed).toEqual(["job1"]);
expect(queue.getActiveGroupsCount()).toBe(0);
});
it("should handle concurrency 1 with 3 apps correctly", async () => {
const queue = new GroupedQueue<{ id: string; app: string }>(1);
const processingTimes: Map<string, { start: number; end: number }> =
new Map();
queue.setHandler(async (data) => {
const start = Date.now();
await new Promise((resolve) => setTimeout(resolve, 50));
const end = Date.now();
processingTimes.set(data.app, { start, end });
});
await Promise.all([
queue.add("app1", { id: "job1", app: "app1" }),
queue.add("app2", { id: "job2", app: "app2" }),
queue.add("app3", { id: "job3", app: "app3" }),
]);
await new Promise((resolve) => setTimeout(resolve, 300));
// Verify sequential processing
const app1 = processingTimes.get("app1")!;
const app2 = processingTimes.get("app2")!;
const app3 = processingTimes.get("app3")!;
expect(app2.start).toBeGreaterThanOrEqual(app1.end);
expect(app3.start).toBeGreaterThanOrEqual(app2.end);
expect(queue.getActiveGroupsCount()).toBe(0);
});
it("should handle 4 apps with concurrency 3 correctly", async () => {
const queue = new GroupedQueue<{ id: string; app: string }>(3);
const concurrentCounts: number[] = [];
queue.setHandler(async () => {
// Track concurrent processing
const interval = setInterval(() => {
concurrentCounts.push(queue.getActiveGroupsCount());
}, 10);
await new Promise((resolve) => setTimeout(resolve, 100));
clearInterval(interval);
});
await Promise.all([
queue.add("app1", { id: "job1", app: "app1" }),
queue.add("app2", { id: "job2", app: "app2" }),
queue.add("app3", { id: "job3", app: "app3" }),
queue.add("app4", { id: "job4", app: "app4" }),
]);
await new Promise((resolve) => setTimeout(resolve, 200));
// Should never exceed concurrency of 3
const maxConcurrent = Math.max(...concurrentCounts);
expect(maxConcurrent).toBeLessThanOrEqual(3);
expect(queue.getActiveGroupsCount()).toBe(0);
});
});
describe("Idle state", () => {
it("should be idle when no jobs are processing", () => {
const queue = new GroupedQueue<{ id: string }>(1);
expect(queue.isIdle()).toBe(true);
});
it("should not be idle while processing", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
let isIdleDuringProcessing = false;
queue.setHandler(async () => {
isIdleDuringProcessing = queue.isIdle();
await new Promise((resolve) => setTimeout(resolve, 50));
});
await queue.add("group1", { id: "job1" });
await new Promise((resolve) => setTimeout(resolve, 30));
expect(isIdleDuringProcessing).toBe(false);
expect(queue.isIdle()).toBe(true);
});
});
describe("Concurrency management", () => {
it("should get current concurrency", () => {
const queue1 = new GroupedQueue<{ id: string }>(1);
const queue2 = new GroupedQueue<{ id: string }>(5);
const queue3 = new GroupedQueue<{ id: string }>(10);
expect(queue1.getConcurrency()).toBe(1);
expect(queue2.getConcurrency()).toBe(5);
expect(queue3.getConcurrency()).toBe(10);
});
it("should set concurrency dynamically", () => {
const queue = new GroupedQueue<{ id: string }>(1);
expect(queue.getConcurrency()).toBe(1);
queue.setConcurrency(3);
expect(queue.getConcurrency()).toBe(3);
queue.setConcurrency(5);
expect(queue.getConcurrency()).toBe(5);
});
it("should throw error when setting concurrency less than 1", () => {
const queue = new GroupedQueue<{ id: string }>(1);
expect(() => queue.setConcurrency(0)).toThrow(
"Concurrency must be at least 1",
);
expect(() => queue.setConcurrency(-1)).toThrow(
"Concurrency must be at least 1",
);
});
it("should process next group when concurrency increases", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 50));
processed.push(data.id);
});
// Add jobs to 3 different groups with concurrency 1
const job1Promise = queue.add("group1", { id: "job1" });
const job2Promise = queue.add("group2", { id: "job2" });
const job3Promise = queue.add("group3", { id: "job3" });
// Wait a bit to ensure job1 starts processing
await new Promise((resolve) => setTimeout(resolve, 10));
// Increase concurrency to 3 - should allow group2 and group3 to start
queue.setConcurrency(3);
// Wait for all to complete
await Promise.all([job1Promise, job2Promise, job3Promise]);
await new Promise((resolve) => setTimeout(resolve, 100));
expect(processed).toHaveLength(3);
expect(processed).toContain("job1");
expect(processed).toContain("job2");
expect(processed).toContain("job3");
});
});
describe("Clear all pending tasks", () => {
it("should clear all pending tasks across all groups", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 100));
processed.push(data.id);
});
// Add multiple jobs to different groups
const job1Promise = queue.add("group1", { id: "job1" });
const job2Promise = queue.add("group1", { id: "job2" });
const job3Promise = queue.add("group2", { id: "job3" });
const job4Promise = queue.add("group2", { id: "job4" });
const job5Promise = queue.add("group3", { id: "job5" });
// Wait a bit to ensure job1 starts processing
await new Promise((resolve) => setTimeout(resolve, 10));
// Clear all pending tasks
const clearedCount = queue.clearAllPendingTasks();
// Should have cleared 4 pending tasks (job2, job3, job4, job5)
// job1 is processing so it's not in the queue anymore
expect(clearedCount).toBe(4);
// Handle all promises
const results = await Promise.allSettled([
job1Promise,
job2Promise,
job3Promise,
job4Promise,
job5Promise,
]);
// job1 should succeed (it was processing)
const job1Result = results[0];
expect(job1Result.status).toBe("fulfilled");
// All pending jobs should be rejected
for (let i = 1; i < results.length; i++) {
const result = results[i];
if (result && result.status === "rejected") {
expect(result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
}
}
// Wait for job1 to complete
await new Promise((resolve) => setTimeout(resolve, 150));
// Only job1 should have processed
expect(processed).toHaveLength(1);
expect(processed).toContain("job1");
});
it("should not clear tasks that are currently processing", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 100));
processed.push(data.id);
});
// Add jobs - first one will start processing immediately
const job1Promise = queue.add("group1", { id: "job1" });
const job2Promise = queue.add("group1", { id: "job2" });
// Wait to ensure job1 is processing (it's been shifted from tasks)
await new Promise((resolve) => setTimeout(resolve, 20));
// Clear all pending - should only clear job2, not job1
// job1 is already executing (not in tasks array)
const clearedCount = queue.clearAllPendingTasks();
expect(clearedCount).toBe(1);
// Handle all promises
const results = await Promise.allSettled([job1Promise, job2Promise]);
// job1 should succeed (it was processing)
const job1Result = results[0];
expect(job1Result.status).toBe("fulfilled");
// job2 should be rejected
const job2Result = results[1];
if (job2Result && job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
}
await new Promise((resolve) => setTimeout(resolve, 50));
// Only job1 should have processed
expect(processed).toHaveLength(1);
expect(processed).toContain("job1");
});
it("should return 0 when no pending tasks", () => {
const queue = new GroupedQueue<{ id: string }>(1);
const clearedCount = queue.clearAllPendingTasks();
expect(clearedCount).toBe(0);
});
it("should clear tasks from multiple groups", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 50));
processed.push(data.id);
});
// Add jobs to multiple groups
const promises = [
queue.add("group1", { id: "job1" }),
queue.add("group1", { id: "job2" }),
queue.add("group2", { id: "job3" }),
queue.add("group2", { id: "job4" }),
queue.add("group3", { id: "job5" }),
];
// Wait a bit for first job to start (it gets shifted from tasks)
await new Promise((resolve) => setTimeout(resolve, 10));
// Clear all pending
const clearedCount = queue.clearAllPendingTasks();
// Should clear 4 tasks (job2, job3, job4, job5)
// job1 is processing so it's not in the queue anymore
expect(clearedCount).toBe(4);
// Handle all promises
const results = await Promise.allSettled(promises);
// job1 should succeed
const job1Result = results[0];
expect(job1Result?.status).toBe("fulfilled");
// Others should be rejected
for (let i = 1; i < results.length; i++) {
const result = results[i];
if (result && result.status === "rejected") {
expect(result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
}
}
await new Promise((resolve) => setTimeout(resolve, 100));
// Only first job should process
expect(processed.length).toBeLessThanOrEqual(1);
});
});
describe("Concurrency change with pending tasks", () => {
it("should clear pending tasks when concurrency changes", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 50));
processed.push(data.id);
});
// Add jobs with concurrency 1
const job1Promise = queue.add("group1", { id: "job1" });
const job2Promise = queue.add("group1", { id: "job2" });
const job3Promise = queue.add("group2", { id: "job3" });
// Wait for job1 to start processing (it gets shifted from tasks)
await new Promise((resolve) => setTimeout(resolve, 10));
// Change concurrency - should clear pending tasks via clearAllPendingTasks
queue.setConcurrency(3);
// Handle all promises
const results = await Promise.allSettled([
job1Promise,
job2Promise,
job3Promise,
]);
// job1 should succeed (it was processing)
const job1Result = results[0];
expect(job1Result.status).toBe("fulfilled");
// Pending jobs should be rejected (job2 and job3 were in queue when cleared)
const job2Result = results[1];
const job3Result = results[2];
// At least one of the pending jobs should be rejected
const rejectedCount = [job2Result, job3Result].filter(
(r) => r && r.status === "rejected",
).length;
expect(rejectedCount).toBeGreaterThan(0);
// Verify rejection messages
if (job2Result && job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
}
if (job3Result && job3Result.status === "rejected") {
expect(job3Result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
}
await new Promise((resolve) => setTimeout(resolve, 100));
// job1 should have processed, others may or may not depending on timing
expect(processed.length).toBeGreaterThanOrEqual(1);
expect(processed).toContain("job1");
});
it("should allow new jobs after concurrency change", async () => {
const queue = new GroupedQueue<{ id: string }>(1);
const processed: string[] = [];
queue.setHandler(async (data) => {
await new Promise((resolve) => setTimeout(resolve, 50));
processed.push(data.id);
});
// Add job with concurrency 1
const job1Promise = queue.add("group1", { id: "job1" });
const job2Promise = queue.add("group1", { id: "job2" });
// Wait for job1 to start (it gets shifted from tasks)
await new Promise((resolve) => setTimeout(resolve, 10));
// Change concurrency to 3 - this calls clearAllPendingTasks internally
queue.setConcurrency(3);
// Handle all promises
const results = await Promise.allSettled([job1Promise, job2Promise]);
// job1 should succeed (it was processing)
const job1Result = results[0];
expect(job1Result.status).toBe("fulfilled");
// job2 should be rejected (it was in queue when cleared)
const job2Result = results[1];
if (job2Result && job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
} else {
// If job2 wasn't rejected, it means it started processing before clear
// This is acceptable as it's a timing issue
}
// Add new jobs after concurrency change - they should work
await Promise.all([
queue.add("group2", { id: "job3" }),
queue.add("group3", { id: "job4" }),
]);
await new Promise((resolve) => setTimeout(resolve, 100));
// job1, job3, and job4 should have processed
expect(processed.length).toBeGreaterThanOrEqual(2);
expect(processed).toContain("job1");
});
});
});
@@ -0,0 +1,313 @@
import { beforeEach, describe, expect, it } from "vitest";
import { QueueManager } from "../../server/queues/queue-manager";
describe("QueueManager", () => {
let manager: QueueManager;
beforeEach(() => {
manager = new QueueManager();
});
describe("Queue creation and retrieval", () => {
it("should create a queue with default concurrency 1", () => {
const queue = manager.getQueue("test-queue");
expect(queue.getConcurrency()).toBe(1);
});
it("should create a queue with custom concurrency", () => {
const queue = manager.getQueue("test-queue", 5);
expect(queue.getConcurrency()).toBe(5);
});
it("should return the same queue instance for the same name", () => {
const queue1 = manager.getQueue("test-queue", 3);
const queue2 = manager.getQueue("test-queue", 5);
expect(queue1).toBe(queue2);
// Concurrency should remain as first set
expect(queue1.getConcurrency()).toBe(3);
});
it("should create different queues for different names", () => {
const queue1 = manager.getQueue("queue1", 2);
const queue2 = manager.getQueue("queue2", 4);
expect(queue1).not.toBe(queue2);
expect(queue1.getConcurrency()).toBe(2);
expect(queue2.getConcurrency()).toBe(4);
});
});
describe("Handler management", () => {
it("should set handler for a queue", async () => {
const processed: string[] = [];
manager.setHandler("test-queue", async (data: { id: string }) => {
processed.push(data.id);
});
await manager.add("test-queue", "group1", { id: "job1" });
await new Promise((resolve) => setTimeout(resolve, 50));
expect(processed).toEqual(["job1"]);
});
it("should handle different handlers for different queues", async () => {
const queue1Processed: string[] = [];
const queue2Processed: string[] = [];
manager.setHandler("queue1", async (data: { id: string }) => {
queue1Processed.push(data.id);
});
manager.setHandler("queue2", async (data: { id: string }) => {
queue2Processed.push(data.id);
});
await Promise.all([
manager.add("queue1", "group1", { id: "job1" }),
manager.add("queue2", "group1", { id: "job2" }),
]);
await new Promise((resolve) => setTimeout(resolve, 50));
expect(queue1Processed).toEqual(["job1"]);
expect(queue2Processed).toEqual(["job2"]);
});
});
describe("Job management", () => {
it("should add jobs to correct queue and group", async () => {
const processed: string[] = [];
manager.setHandler("test-queue", async (data: { id: string }) => {
processed.push(data.id);
});
await manager.add("test-queue", "group1", { id: "job1" });
await manager.add("test-queue", "group2", { id: "job2" });
await new Promise((resolve) => setTimeout(resolve, 50));
expect(processed).toContain("job1");
expect(processed).toContain("job2");
});
it("should create queue with concurrency when adding job", async () => {
const processed: string[] = [];
// Create queue with concurrency first (without handler)
manager.getQueue("new-queue", 3);
// Set handler
manager.setHandler("new-queue", async (data: { id: string }) => {
processed.push(data.id);
});
// Now add job - it should process
await manager.add("new-queue", "group1", { id: "job1" });
await new Promise((resolve) => setTimeout(resolve, 50));
const queue = manager.getQueue("new-queue");
expect(queue.getConcurrency()).toBe(3);
expect(processed).toEqual(["job1"]);
});
});
describe("Queue operations", () => {
it("should clear group in specific queue", async () => {
const processed: string[] = [];
manager.setHandler("test-queue", async (data: { id: string }) => {
await new Promise((resolve) => setTimeout(resolve, 100));
processed.push(data.id);
});
// Add jobs but don't await - they'll start processing
const job1Promise = manager.add("test-queue", "group1", { id: "job1" });
const job2Promise = manager.add("test-queue", "group1", { id: "job2" });
// Clear immediately - job1 might be processing, but job2 should be cleared
manager.clearGroup("test-queue", "group1");
// Use Promise.allSettled to handle both promises properly
const results = await Promise.allSettled([job1Promise, job2Promise]);
// job1 might succeed or fail depending on timing
// job2 should be rejected
const job2Result = results[1];
if (job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe("Queue cleared");
}
await new Promise((resolve) => setTimeout(resolve, 150));
// Job1 might have processed, but job2 should not
expect(processed.length).toBeLessThanOrEqual(1);
});
it("should get group length for specific queue", async () => {
manager.setHandler("test-queue", async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
});
// Add jobs without awaiting - check length immediately
const job1Promise = manager.add("test-queue", "group1", { id: "job1" });
const job2Promise = manager.add("test-queue", "group1", { id: "job2" });
// Check length immediately - at least one should be pending
// (job1 might be processing, but job2 should be pending)
const length = manager.getGroupLength("test-queue", "group1");
expect(length).toBeGreaterThanOrEqual(0);
// Wait for both to complete
await Promise.all([job1Promise, job2Promise]);
await new Promise((resolve) => setTimeout(resolve, 50));
expect(manager.getGroupLength("test-queue", "group1")).toBe(0);
});
it("should get total length for specific queue", async () => {
manager.setHandler("test-queue", async () => {
await new Promise((resolve) => setTimeout(resolve, 50));
});
// Add jobs without awaiting - check length immediately
const promises = [
manager.add("test-queue", "group1", { id: "job1" }),
manager.add("test-queue", "group2", { id: "job2" }),
manager.add("test-queue", "group3", { id: "job3" }),
];
// Check length immediately - at least some should be pending
const length = manager.getTotalLength("test-queue");
expect(length).toBeGreaterThanOrEqual(0);
// Wait for all to complete
await Promise.all(promises);
await new Promise((resolve) => setTimeout(resolve, 100));
expect(manager.getTotalLength("test-queue")).toBe(0);
});
it("should check if queue is idle", async () => {
manager.setHandler("test-queue", async () => {
await new Promise((resolve) => setTimeout(resolve, 50));
});
expect(manager.isIdle("test-queue")).toBe(true);
await manager.add("test-queue", "group1", { id: "job1" });
await new Promise((resolve) => setTimeout(resolve, 100));
expect(manager.isIdle("test-queue")).toBe(true);
});
});
describe("Queue lifecycle", () => {
it("should close a specific queue", async () => {
manager.setHandler("test-queue", async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
});
// Add first job and wait a bit to ensure it starts processing
const job1Promise = manager.add("test-queue", "group1", { id: "job1" });
// Add second job without awaiting
const job2Promise = manager.add("test-queue", "group1", { id: "job2" });
// Wait a tiny bit to ensure job2 is queued
await new Promise((resolve) => setTimeout(resolve, 10));
// Close queue - job2 should be rejected
await manager.closeQueue("test-queue");
// Use Promise.allSettled to handle both promises properly
const results = await Promise.allSettled([job1Promise, job2Promise]);
// job1 might succeed or fail depending on timing
// job2 should be rejected
const job2Result = results[1];
if (job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe("Queue closed");
}
expect(manager.getQueueNames()).not.toContain("test-queue");
});
it("should close all queues", async () => {
manager.setHandler("queue1", async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
});
manager.setHandler("queue2", async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
});
await manager.add("queue1", "group1", { id: "job1" });
await manager.add("queue2", "group1", { id: "job2" });
await manager.closeAll();
expect(manager.getQueueNames()).toHaveLength(0);
});
it("should get all queue names", () => {
manager.getQueue("queue1");
manager.getQueue("queue2");
manager.getQueue("queue3");
const names = manager.getQueueNames();
expect(names).toContain("queue1");
expect(names).toContain("queue2");
expect(names).toContain("queue3");
expect(names).toHaveLength(3);
});
});
describe("Multiple queues with different concurrency", () => {
it("should handle multiple queues with different concurrency settings", async () => {
const queue1Processed: string[] = [];
const queue2Processed: string[] = [];
// Create queues with specific concurrency FIRST, before setting handlers
const queue1 = manager.getQueue("queue1", 1);
const queue2 = manager.getQueue("queue2", 3);
// Verify concurrency is set correctly before proceeding
expect(queue1.getConcurrency()).toBe(1);
expect(queue2.getConcurrency()).toBe(3);
manager.setHandler("queue1", async (data: { id: string }) => {
await new Promise((resolve) => setTimeout(resolve, 50));
queue1Processed.push(data.id);
});
manager.setHandler("queue2", async (data: { id: string }) => {
await new Promise((resolve) => setTimeout(resolve, 50));
queue2Processed.push(data.id);
});
// Queue1 with concurrency 1 (sequential)
await Promise.all([
manager.add("queue1", "app1", { id: "job1" }),
manager.add("queue1", "app2", { id: "job2" }),
]);
// Queue2 with concurrency 3 (parallel)
await Promise.all([
manager.add("queue2", "app1", { id: "job1" }),
manager.add("queue2", "app2", { id: "job2" }),
manager.add("queue2", "app3", { id: "job3" }),
]);
await new Promise((resolve) => setTimeout(resolve, 200));
expect(queue1Processed).toHaveLength(2);
expect(queue2Processed).toHaveLength(3);
// Verify concurrency settings are still correct
expect(manager.getQueue("queue1").getConcurrency()).toBe(1);
expect(manager.getQueue("queue2").getConcurrency()).toBe(3);
});
});
});
@@ -0,0 +1,250 @@
import { beforeEach, describe, expect, it } from "vitest";
import type { DeploymentJob } from "../../server/queues/queue-types";
import {
getConcurrency,
myQueue,
setConcurrency,
} from "../../server/queues/queueSetup";
describe("queueSetup", () => {
beforeEach(() => {
// Reset concurrency to default (1) before each test
setConcurrency(1);
// Clear all pending tasks
myQueue.clearAllPendingTasks();
});
describe("getConcurrency", () => {
it("should return default concurrency of 1", () => {
const concurrency = getConcurrency();
expect(concurrency).toBe(1);
});
it("should return current concurrency after setting it", () => {
setConcurrency(3);
expect(getConcurrency()).toBe(3);
setConcurrency(5);
expect(getConcurrency()).toBe(5);
});
});
describe("setConcurrency", () => {
it("should set concurrency successfully", () => {
const clearedCount = setConcurrency(3);
expect(getConcurrency()).toBe(3);
expect(clearedCount).toBe(0); // No pending tasks to clear
});
it("should throw error for concurrency less than 1", () => {
expect(() => setConcurrency(0)).toThrow("Concurrency must be at least 1");
expect(() => setConcurrency(-1)).toThrow(
"Concurrency must be at least 1",
);
});
it("should return 0 cleared builds when no pending tasks", () => {
const clearedCount = setConcurrency(2);
expect(clearedCount).toBe(0);
expect(getConcurrency()).toBe(2);
});
it("should clear pending builds when concurrency changes", async () => {
const processed: string[] = [];
// Set handler
myQueue.setHandler(async (job: DeploymentJob) => {
await new Promise((resolve) => setTimeout(resolve, 100));
if (job.applicationType === "application") {
processed.push(job.applicationId);
} else if (job.applicationType === "compose") {
processed.push(job.composeId);
} else if (job.applicationType === "application-preview") {
processed.push(job.previewDeploymentId);
}
});
// Add jobs to different groups
const job1: DeploymentJob = {
applicationId: "app1",
titleLog: "Test",
descriptionLog: "Test",
type: "deploy",
applicationType: "application",
server: false,
};
const job2: DeploymentJob = {
applicationId: "app2",
titleLog: "Test",
descriptionLog: "Test",
type: "deploy",
applicationType: "application",
server: false,
};
const job3: DeploymentJob = {
applicationId: "app3",
titleLog: "Test",
descriptionLog: "Test",
type: "deploy",
applicationType: "application",
server: false,
};
// Add jobs without awaiting
const promise1 = myQueue.add("application:app1", job1);
const promise2 = myQueue.add("application:app2", job2);
const promise3 = myQueue.add("application:app3", job3);
// Wait for first job to start processing
await new Promise((resolve) => setTimeout(resolve, 10));
// Change concurrency - should clear pending builds
const clearedCount = setConcurrency(3);
// Should have cleared 2 pending builds (app2 and app3)
expect(clearedCount).toBe(2);
expect(getConcurrency()).toBe(3);
// Handle all promises - use allSettled to handle both resolved and rejected
const results = await Promise.allSettled([promise1, promise2, promise3]);
// job1 should succeed (it was processing), others should be rejected
const job1Result = results[0];
if (job1Result.status === "fulfilled") {
// Job1 completed successfully
}
// Pending jobs should be rejected
const job2Result = results[1];
const job3Result = results[2];
if (job2Result && job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
}
if (job3Result && job3Result.status === "rejected") {
expect(job3Result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
}
await new Promise((resolve) => setTimeout(resolve, 150));
// Only first job should have processed
expect(processed.length).toBeLessThanOrEqual(1);
});
it("should not clear builds when concurrency doesn't change", async () => {
// Set to 2
setConcurrency(2);
expect(getConcurrency()).toBe(2);
// Set to 2 again - should not clear anything
const clearedCount = setConcurrency(2);
expect(clearedCount).toBe(0);
expect(getConcurrency()).toBe(2);
});
it("should allow new jobs after concurrency change", async () => {
const processed: string[] = [];
myQueue.setHandler(async (job: DeploymentJob) => {
await new Promise((resolve) => setTimeout(resolve, 50));
if (job.applicationType === "application") {
processed.push(job.applicationId);
}
});
// Add job with concurrency 1
const job1: DeploymentJob = {
applicationId: "app1",
titleLog: "Test",
descriptionLog: "Test",
type: "deploy",
applicationType: "application",
server: false,
};
const job2: DeploymentJob = {
applicationId: "app2",
titleLog: "Test",
descriptionLog: "Test",
type: "deploy",
applicationType: "application",
server: false,
};
const promise1 = myQueue.add("application:app1", job1);
const promise2 = myQueue.add("application:app2", job2);
// Wait for first job to start
await new Promise((resolve) => setTimeout(resolve, 10));
// Change concurrency to 3
const clearedCount = setConcurrency(3);
expect(clearedCount).toBe(1); // app2 should be cleared
// Handle all promises - use allSettled to handle both resolved and rejected
const results = await Promise.allSettled([promise1, promise2]);
// job1 should succeed (it was processing)
const job1Result = results[0];
if (job1Result.status === "fulfilled") {
// Job1 completed successfully
}
// app2 should be rejected
const job2Result = results[1];
if (job2Result.status === "rejected") {
expect(job2Result.reason.message).toBe(
"Concurrency changed - queue cleared",
);
}
// Add new jobs after concurrency change - they should work
const job3: DeploymentJob = {
applicationId: "app3",
titleLog: "Test",
descriptionLog: "Test",
type: "deploy",
applicationType: "application",
server: false,
};
const job4: DeploymentJob = {
applicationId: "app4",
titleLog: "Test",
descriptionLog: "Test",
type: "deploy",
applicationType: "application",
server: false,
};
await Promise.all([
myQueue.add("application:app3", job3),
myQueue.add("application:app4", job4),
]);
await new Promise((resolve) => setTimeout(resolve, 150));
// app1, app3, and app4 should have processed
expect(processed.length).toBeGreaterThanOrEqual(2);
expect(processed).toContain("app1");
});
it("should handle multiple concurrency changes correctly", () => {
// Start at 1
expect(getConcurrency()).toBe(1);
// Change to 3
setConcurrency(3);
expect(getConcurrency()).toBe(3);
// Change to 5
setConcurrency(5);
expect(getConcurrency()).toBe(5);
// Change back to 1
setConcurrency(1);
expect(getConcurrency()).toBe(1);
});
});
});
@@ -79,7 +79,7 @@ export const ShowGeneralApplication = ({ applicationId }: Props) => {
>
<Button
variant="default"
isLoading={data?.applicationStatus === "running"}
// isLoading={data?.applicationStatus === "running"}
className="flex items-center gap-1.5 group focus-visible:ring-2 focus-visible:ring-offset-2"
>
<Tooltip>
@@ -15,6 +15,7 @@ import { api } from "@/utils/api";
import { ShowModalLogs } from "../../web-server/show-modal-logs";
import { TerminalModal } from "../../web-server/terminal-modal";
import { GPUSupportModal } from "../gpu-support-modal";
import { ChangeConcurrencyModal } from "../change-concurrency-modal";
export const ShowDokployActions = () => {
const { t } = useTranslation("settings");
@@ -101,6 +102,14 @@ export const ShowDokployActions = () => {
>
Reload Redis
</DropdownMenuItem>
<ChangeConcurrencyModal>
<DropdownMenuItem
className="cursor-pointer"
onSelect={(e) => e.preventDefault()}
>
Change Concurrency
</DropdownMenuItem>
</ChangeConcurrencyModal>
</DropdownMenuGroup>
</DropdownMenuContent>
</DropdownMenu>
@@ -7,9 +7,11 @@ import {
DialogTrigger,
} from "@/components/ui/dialog";
import { DropdownMenuItem } from "@/components/ui/dropdown-menu";
import { Button } from "@/components/ui/button";
import { ShowStorageActions } from "./show-storage-actions";
import { ShowTraefikActions } from "./show-traefik-actions";
import { ToggleDockerCleanup } from "./toggle-docker-cleanup";
import { ChangeConcurrencyModal } from "../change-concurrency-modal";
interface Props {
serverId: string;
@@ -37,6 +39,16 @@ export const ShowServerActions = ({ serverId }: Props) => {
<ShowTraefikActions serverId={serverId} />
<ShowStorageActions serverId={serverId} />
<ToggleDockerCleanup serverId={serverId} />
<div className="col-span-2">
<ChangeConcurrencyModal
serverId={serverId}
trigger={
<Button variant="outline" className="w-full">
Change Concurrency
</Button>
}
/>
</div>
</div>
</DialogContent>
</Dialog>
@@ -0,0 +1,180 @@
"use client";
import { InfoIcon, Loader2 } from "lucide-react";
import { useState } from "react";
import { toast } from "sonner";
import { Alert, AlertDescription } from "@/components/ui/alert";
import { Button } from "@/components/ui/button";
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
DialogTrigger,
} from "@/components/ui/dialog";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { api } from "@/utils/api";
interface Props {
serverId?: string;
trigger?: React.ReactNode;
}
export const ChangeConcurrencyModal = ({ serverId, trigger }: Props) => {
const [isOpen, setIsOpen] = useState(false);
const [concurrency, setConcurrency] = useState<number | "">("");
const { data, isLoading: isLoadingCurrent } =
api.settings.getDeploymentConcurrency.useQuery(
{ serverId },
{
enabled: isOpen,
onSuccess: (data) => {
if (concurrency === "") {
setConcurrency(data.concurrency);
}
},
},
);
const { mutateAsync, isLoading } =
api.settings.setDeploymentConcurrency.useMutation();
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (
typeof concurrency !== "number" ||
concurrency < 1 ||
concurrency > 20
) {
toast.error("Concurrency must be between 1 and 20");
return;
}
try {
const result = await mutateAsync({ concurrency, serverId });
if (result.clearedBuilds > 0) {
toast.warning(
`Concurrency updated. ${result.clearedBuilds} pending build${result.clearedBuilds > 1 ? "s were" : " was"} cancelled.`,
);
} else {
toast.success("Concurrency updated successfully");
}
setIsOpen(false);
} catch (error) {
toast.error("Failed to update concurrency");
}
};
const serverType = serverId ? "Remote Server" : "Dokploy Server";
return (
<Dialog open={isOpen} onOpenChange={setIsOpen}>
<DialogTrigger asChild>
{trigger || (
<Button variant="outline" size="sm">
Change Concurrency
</Button>
)}
</DialogTrigger>
<DialogContent className="sm:max-w-md">
<DialogHeader>
<DialogTitle>Deployment Concurrency - {serverType}</DialogTitle>
<DialogDescription>
Configure how many deployments can run simultaneously on this
server.
</DialogDescription>
</DialogHeader>
<form onSubmit={handleSubmit} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="concurrency">Concurrency</Label>
<Input
id="concurrency"
type="number"
min={1}
max={20}
value={concurrency}
onChange={(e) => {
const value = e.target.value;
setConcurrency(value === "" ? "" : Number.parseInt(value, 10));
}}
placeholder="Enter concurrency (1-20)"
disabled={isLoading || isLoadingCurrent}
/>
{isLoadingCurrent && (
<div className="flex items-center gap-2 text-sm text-muted-foreground">
<Loader2 className="h-4 w-4 animate-spin" />
Loading current concurrency...
</div>
)}
{!isLoadingCurrent && data && (
<p className="text-sm text-muted-foreground">
Current: {data.concurrency}
</p>
)}
</div>
<div className="space-y-3">
<Alert>
<InfoIcon className="h-4 w-4" />
<AlertDescription className="text-sm">
<div className="space-y-1 mt-1">
<p>
<strong>Default:</strong> 1 deployment at a time
(sequential)
</p>
<p>
<strong>Higher values:</strong> More deployments in
parallel, but will use more RAM and CPU resources.
</p>
{serverId && (
<p className="text-muted-foreground text-xs mt-2">
This setting applies to deployments on this remote server.
</p>
)}
{!serverId && (
<p className="text-muted-foreground text-xs mt-2">
This setting applies to deployments on the Dokploy server.
</p>
)}
</div>
</AlertDescription>
</Alert>
<Alert variant="destructive">
<InfoIcon className="h-4 w-4" />
<AlertDescription className="text-sm font-medium">
<strong>Warning:</strong> Changing concurrency will cancel all
pending builds. Currently running builds will continue, but
queued builds will be cancelled.
</AlertDescription>
</Alert>
</div>
<DialogFooter>
<Button
type="button"
variant="outline"
onClick={() => setIsOpen(false)}
disabled={isLoading}
>
Cancel
</Button>
<Button type="submit" disabled={isLoading || isLoadingCurrent}>
{isLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Updating...
</>
) : (
"Update Concurrency"
)}
</Button>
</DialogFooter>
</form>
</DialogContent>
</Dialog>
);
};
@@ -253,12 +253,8 @@ export default async function handler(
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
`application:${jobData.applicationId}`,
jobData,
);
} catch (error) {
res.status(400).json({ message: "Error deploying Application", error });
@@ -183,12 +183,8 @@ export default async function handler(
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
`compose:${jobData.composeId}`,
jobData,
);
} catch (error) {
res.status(400).json({ message: "Error deploying Compose", error });
+10 -30
View File
@@ -132,12 +132,8 @@ export default async function handler(
continue;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
`application:${jobData.applicationId}`,
jobData,
);
}
@@ -170,12 +166,8 @@ export default async function handler(
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
`compose:${jobData.composeId}`,
jobData,
);
}
@@ -250,12 +242,8 @@ export default async function handler(
continue;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
`application:${jobData.applicationId}`,
jobData,
);
}
@@ -296,12 +284,8 @@ export default async function handler(
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
`compose:${jobData.composeId}`,
jobData,
);
}
@@ -495,12 +479,8 @@ export default async function handler(
continue;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
`preview:${jobData.previewDeploymentId}`,
jobData,
);
}
return res.status(200).json({ message: "Apps Deployed" });
+11 -25
View File
@@ -58,7 +58,10 @@ import {
applications,
} from "@/server/db/schema";
import type { DeploymentJob } from "@/server/queues/queue-types";
import { cleanQueuesByApplication, myQueue } from "@/server/queues/queueSetup";
import {
addJobAsync,
cleanQueuesByApplication,
} from "@/server/queues/queueSetup";
import { cancelDeployment, deploy } from "@/server/utils/deploy";
import { uploadFileSchema } from "@/utils/schema";
@@ -335,14 +338,9 @@ export const applicationRouter = createTRPCRouter({
await deploy(jobData);
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
);
// Fire and forget - UI doesn't wait for deployment to complete
addJobAsync(`application:${jobData.applicationId}`, jobData);
return { success: true, message: "Deployment queued" };
}),
saveEnvironment: protectedProcedure
.input(apiSaveEnvironmentVariables)
@@ -700,14 +698,8 @@ export const applicationRouter = createTRPCRouter({
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
);
addJobAsync(`application:${jobData.applicationId}`, jobData);
return { success: true, message: "Deployment queued" };
}),
cleanQueues: protectedProcedure
@@ -798,14 +790,8 @@ export const applicationRouter = createTRPCRouter({
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
);
// Fire and forget - UI doesn't wait for deployment to complete
addJobAsync(`application:${jobData.applicationId}`, jobData);
return true;
}),
updateTraefikConfig: protectedProcedure
+3 -17
View File
@@ -59,7 +59,7 @@ import {
compose as composeTable,
} from "@/server/db/schema";
import type { DeploymentJob } from "@/server/queues/queue-types";
import { cleanQueuesByCompose, myQueue } from "@/server/queues/queueSetup";
import { addJobAsync, cleanQueuesByCompose } from "@/server/queues/queueSetup";
import { cancelDeployment, deploy } from "@/server/utils/deploy";
import { generatePassword } from "@/templates/utils";
import { createTRPCRouter, protectedProcedure, publicProcedure } from "../trpc";
@@ -401,14 +401,7 @@ export const composeRouter = createTRPCRouter({
await deploy(jobData);
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
);
addJobAsync(`compose:${jobData.composeId}`, jobData);
return { success: true, message: "Deployment queued" };
}),
redeploy: protectedProcedure
@@ -437,14 +430,7 @@ export const composeRouter = createTRPCRouter({
await deploy(jobData);
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
);
addJobAsync(`compose:${jobData.composeId}`, jobData);
return { success: true, message: "Redeployment queued" };
}),
stop: protectedProcedure
@@ -862,4 +862,49 @@ export const settingsRouter = createTRPCRouter({
const ips = process.env.DOKPLOY_CLOUD_IPS?.split(",");
return ips;
}),
getDeploymentConcurrency: adminProcedure
.input(
z.object({
serverId: z.string().optional(),
}),
)
.query(async ({ input }) => {
// For now, remote servers use the same queue as dokploy server
// In the future, we could implement per-server queues
const { getConcurrency } = await import("@/server/queues/queueSetup");
return {
concurrency: getConcurrency(),
serverId: input.serverId,
};
}),
setDeploymentConcurrency: adminProcedure
.input(
z.object({
concurrency: z.number().int().min(1).max(20),
serverId: z.string().optional(),
}),
)
.mutation(async ({ input }) => {
// For now, remote servers use the same queue as dokploy server
// In the future, we could implement per-server queues
const { setConcurrency, getConcurrency } = await import(
"@/server/queues/queueSetup"
);
const currentConcurrency = getConcurrency();
const clearedCount = setConcurrency(input.concurrency);
const serverType = input.serverId ? "remote server" : "Dokploy server";
let message = `${serverType} deployment concurrency updated from ${currentConcurrency} to ${input.concurrency}. Changes take effect immediately.`;
if (clearedCount > 0) {
message += ` ${clearedCount} pending build${clearedCount > 1 ? "s were" : " was"} cancelled due to concurrency change.`;
}
return {
success: true,
message,
concurrency: input.concurrency,
serverId: input.serverId,
clearedBuilds: clearedCount,
};
}),
});
+66 -56
View File
@@ -8,67 +8,77 @@ import {
updateCompose,
updatePreviewDeployment,
} from "@dokploy/server";
import { type Job, Worker } from "bullmq";
import type { DeploymentJob } from "./queue-types";
import { redisConfig } from "./redis-connection";
import { myQueue } from "./queueSetup";
export const deploymentWorker = new Worker(
"deployments",
async (job: Job<DeploymentJob>) => {
try {
if (job.data.applicationType === "application") {
await updateApplicationStatus(job.data.applicationId, "running");
// Set the handler for processing deployment jobs
console.log("Setting deployment queue handler");
myQueue.setHandler(async (job: DeploymentJob) => {
const jobId =
job.applicationType === "application"
? job.applicationId
: job.applicationType === "compose"
? job.composeId
: job.previewDeploymentId;
console.log("Handler called with job:", job.applicationType, jobId);
try {
if (job.applicationType === "application") {
await updateApplicationStatus(job.applicationId, "running");
if (job.data.type === "redeploy") {
await rebuildApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "deploy") {
await deployApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
} else if (job.data.applicationType === "compose") {
await updateCompose(job.data.composeId, {
composeStatus: "running",
if (job.type === "redeploy") {
await rebuildApplication({
applicationId: job.applicationId,
titleLog: job.titleLog,
descriptionLog: job.descriptionLog,
});
if (job.data.type === "deploy") {
await deployCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "redeploy") {
await rebuildCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
} else if (job.data.applicationType === "application-preview") {
await updatePreviewDeployment(job.data.previewDeploymentId, {
previewStatus: "running",
} else if (job.type === "deploy") {
await deployApplication({
applicationId: job.applicationId,
titleLog: job.titleLog,
descriptionLog: job.descriptionLog,
});
}
} else if (job.applicationType === "compose") {
await updateCompose(job.composeId, {
composeStatus: "running",
});
if (job.type === "deploy") {
await deployCompose({
composeId: job.composeId,
titleLog: job.titleLog,
descriptionLog: job.descriptionLog,
});
} else if (job.type === "redeploy") {
await rebuildCompose({
composeId: job.composeId,
titleLog: job.titleLog,
descriptionLog: job.descriptionLog,
});
}
} else if (job.applicationType === "application-preview") {
await updatePreviewDeployment(job.previewDeploymentId, {
previewStatus: "running",
});
if (job.type === "deploy") {
await deployPreviewApplication({
applicationId: job.applicationId,
titleLog: job.titleLog,
descriptionLog: job.descriptionLog,
previewDeploymentId: job.previewDeploymentId,
});
if (job.data.type === "deploy") {
await deployPreviewApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
previewDeploymentId: job.data.previewDeploymentId,
});
}
}
} catch (error) {
console.log("Error", error);
}
} catch (error) {
console.log("Error processing deployment job", error);
throw error; // Re-throw to let the queue handle retries if needed
}
});
// Export for compatibility (no longer needed but kept for imports)
export const deploymentWorker = {
run: () => {
// Queue starts processing automatically when jobs are added
console.log("Deployment queue handler initialized");
},
{
autorun: false,
connection: redisConfig,
},
);
};
@@ -0,0 +1,256 @@
/**
* In-memory grouped queue implementation
* Each group processes one job at a time (FIFO per group)
* Multiple groups can process in parallel
*/
type Task<T> = {
data: T;
resolve: () => void;
reject: (error: Error) => void;
};
type GroupQueue<T> = {
tasks: Task<T>[];
processing: boolean;
};
export class GroupedQueue<T> {
private groups: Map<string, GroupQueue<T>> = new Map();
private handler?: (data: T) => Promise<void>;
private concurrency: number;
private activeGroups: Set<string> = new Set();
constructor(concurrency = 4) {
this.concurrency = concurrency;
}
/**
* Set the handler function that processes each job
*/
setHandler(handler: (data: T) => Promise<void>) {
this.handler = handler;
}
/**
* Add a job to a group queue
*/
async add(groupId: string, data: T): Promise<void> {
if (process.env.NODE_ENV !== "test") {
console.log(
`Adding job to group ${groupId}, handler set: ${!!this.handler}`,
);
}
return new Promise((resolve, reject) => {
if (!this.groups.has(groupId)) {
this.groups.set(groupId, {
tasks: [],
processing: false,
});
}
const group = this.groups.get(groupId)!;
group.tasks.push({
data,
resolve,
reject,
});
// Start processing if not already processing and under concurrency limit
if (!group.processing && this.activeGroups.size < this.concurrency) {
this.processGroup(groupId);
}
});
}
/**
* Process jobs in a group queue
*/
private async processGroup(groupId: string): Promise<void> {
const group = this.groups.get(groupId);
if (!group || group.processing) {
return;
}
// Wait for handler to be set if not available
if (!this.handler) {
if (process.env.NODE_ENV !== "test") {
console.log(`Handler not set yet for group ${groupId}, waiting...`);
}
// Retry after a short delay
setTimeout(() => {
if (this.handler && group.tasks.length > 0) {
this.processGroup(groupId);
}
}, 100);
return;
}
// Check concurrency limit
if (this.activeGroups.size >= this.concurrency) {
return;
}
group.processing = true;
this.activeGroups.add(groupId);
if (process.env.NODE_ENV !== "test") {
console.log(`Processing group ${groupId}, tasks: ${group.tasks.length}`);
}
while (group.tasks.length > 0) {
const task = group.tasks.shift()!;
try {
if (process.env.NODE_ENV !== "test") {
console.log(`Executing handler for group ${groupId}`);
}
await this.handler!(task.data);
task.resolve();
if (process.env.NODE_ENV !== "test") {
console.log(`Handler completed for group ${groupId}`);
}
} catch (error) {
if (process.env.NODE_ENV !== "test") {
console.error(`Handler error for group ${groupId}:`, error);
}
task.reject(error instanceof Error ? error : new Error(String(error)));
}
}
group.processing = false;
this.activeGroups.delete(groupId);
// Try to process another group if there are waiting groups
this.processNextGroup();
}
/**
* Process the next available group
*/
private processNextGroup(): void {
if (this.activeGroups.size >= this.concurrency) {
return;
}
// Find a group with pending tasks that's not currently processing
for (const [groupId, group] of this.groups.entries()) {
if (
!group.processing &&
group.tasks.length > 0 &&
!this.activeGroups.has(groupId)
) {
this.processGroup(groupId);
break;
}
}
}
/**
* Remove all tasks for a specific group
*/
clearGroup(groupId: string): void {
const group = this.groups.get(groupId);
if (group) {
// Reject all pending tasks
for (const task of group.tasks) {
task.reject(new Error("Queue cleared"));
}
group.tasks = [];
}
}
/**
* Clear all pending tasks across all groups
* This is useful when changing concurrency settings
* Note: This only clears tasks in the queue, not the currently executing task
*/
clearAllPendingTasks(): number {
let clearedCount = 0;
for (const [groupId, group] of this.groups.entries()) {
// Clear all pending tasks in the queue
// The currently executing task is not in group.tasks (it was already shifted)
if (group.tasks.length > 0) {
clearedCount += group.tasks.length;
for (const task of group.tasks) {
task.reject(new Error("Concurrency changed - queue cleared"));
}
group.tasks = [];
}
}
return clearedCount;
}
/**
* Get the number of pending tasks for a group
*/
getGroupLength(groupId: string): number {
return this.groups.get(groupId)?.tasks.length ?? 0;
}
/**
* Get total number of pending tasks across all groups
*/
getTotalLength(): number {
let total = 0;
for (const group of this.groups.values()) {
total += group.tasks.length;
}
return total;
}
/**
* Check if queue is idle (no active processing)
*/
isIdle(): boolean {
return this.activeGroups.size === 0;
}
/**
* Get the number of active groups (for testing)
*/
getActiveGroupsCount(): number {
return this.activeGroups.size;
}
/**
* Get the concurrency limit
*/
getConcurrency(): number {
return this.concurrency;
}
/**
* Set the concurrency limit dynamically
* This allows changing concurrency without recreating the queue
* WARNING: This will clear all pending tasks when concurrency changes
*/
setConcurrency(concurrency: number): void {
if (concurrency < 1) {
throw new Error("Concurrency must be at least 1");
}
const concurrencyChanged = this.concurrency !== concurrency;
this.concurrency = concurrency;
// If concurrency changed, clear all pending tasks
if (concurrencyChanged) {
this.clearAllPendingTasks();
}
// Process next group if we now have capacity
this.processNextGroup();
}
/**
* Close the queue and reject all pending tasks
*/
async close(): Promise<void> {
for (const [groupId, group] of this.groups.entries()) {
for (const task of group.tasks) {
task.reject(new Error("Queue closed"));
}
group.tasks = [];
}
this.groups.clear();
this.activeGroups.clear();
}
}
+112
View File
@@ -0,0 +1,112 @@
/**
* Queue Manager - Manages multiple dynamic queues
* Each queue can have its own concurrency configuration
*/
import { GroupedQueue } from "./grouped-queue-wrapper";
export class QueueManager {
private queues: Map<string, GroupedQueue<any>> = new Map();
/**
* Get or create a queue with the specified name and concurrency
* Note: If queue already exists, concurrency parameter is ignored
*/
getQueue<T>(name: string, concurrency = 1): GroupedQueue<T> {
if (!this.queues.has(name)) {
this.queues.set(name, new GroupedQueue<T>(concurrency));
}
return this.queues.get(name) as GroupedQueue<T>;
}
/**
* Set handler for a specific queue
*/
setHandler<T>(queueName: string, handler: (data: T) => Promise<void>): void {
const queue = this.getQueue<T>(queueName);
queue.setHandler(handler);
}
/**
* Add a job to a specific queue and group
* If concurrency is provided and queue doesn't exist, creates it with that concurrency
*/
async add<T>(
queueName: string,
groupId: string,
data: T,
concurrency?: number,
): Promise<void> {
// If concurrency is provided and queue doesn't exist, create with that concurrency
if (concurrency !== undefined && !this.queues.has(queueName)) {
this.queues.set(queueName, new GroupedQueue<T>(concurrency));
}
const queue = this.getQueue<T>(queueName);
return queue.add(groupId, data);
}
/**
* Clear all tasks for a specific group in a queue
*/
clearGroup(queueName: string, groupId: string): void {
const queue = this.queues.get(queueName);
if (queue) {
queue.clearGroup(groupId);
}
}
/**
* Get the number of pending tasks for a group in a queue
*/
getGroupLength(queueName: string, groupId: string): number {
const queue = this.queues.get(queueName);
return queue ? queue.getGroupLength(groupId) : 0;
}
/**
* Get total number of pending tasks across all groups in a queue
*/
getTotalLength(queueName: string): number {
const queue = this.queues.get(queueName);
return queue ? queue.getTotalLength() : 0;
}
/**
* Check if a queue is idle
*/
isIdle(queueName: string): boolean {
const queue = this.queues.get(queueName);
return queue ? queue.isIdle() : true;
}
/**
* Close a specific queue
*/
async closeQueue(queueName: string): Promise<void> {
const queue = this.queues.get(queueName);
if (queue) {
await queue.close();
this.queues.delete(queueName);
}
}
/**
* Close all queues
*/
async closeAll(): Promise<void> {
const promises = Array.from(this.queues.keys()).map((name) =>
this.closeQueue(name),
);
await Promise.all(promises);
}
/**
* Get all queue names
*/
getQueueNames(): string[] {
return Array.from(this.queues.keys());
}
}
// Singleton instance
export const queueManager = new QueueManager();
+95 -29
View File
@@ -1,44 +1,110 @@
import { Queue } from "bullmq";
import { redisConfig } from "./redis-connection";
import { GroupedQueue } from "./grouped-queue-wrapper";
import type { DeploymentJob } from "./queue-types";
const myQueue = new Queue("deployments", {
connection: redisConfig,
});
// In-memory grouped queue: processes one job per group at a time
// Multiple groups can process in parallel (up to concurrency limit)
// Concurrency can be configured via DEPLOYMENT_QUEUE_CONCURRENCY env var (default: 1)
// or dynamically via setConcurrency() function
let DEPLOYMENT_CONCURRENCY = Number.parseInt(
process.env.DEPLOYMENT_QUEUE_CONCURRENCY || "1",
10,
);
process.on("SIGTERM", () => {
myQueue.close();
// Validate concurrency is at least 1
if (DEPLOYMENT_CONCURRENCY < 1) {
DEPLOYMENT_CONCURRENCY = 1;
}
const myQueue = new GroupedQueue<DeploymentJob>(DEPLOYMENT_CONCURRENCY);
// Initialize handler when this module is imported
// Use dynamic import to avoid circular dependency
// The handler will be set when deployments-queue.ts is imported
let handlerInitialized = false;
const initializeHandler = async () => {
if (!handlerInitialized) {
handlerInitialized = true;
// This will set the handler
await import("./deployments-queue");
}
};
// Initialize handler immediately (non-blocking)
void initializeHandler();
process.on("SIGTERM", async () => {
await myQueue.close();
process.exit(0);
});
myQueue.on("error", (error) => {
if ((error as any).code === "ECONNREFUSED") {
console.error(
"Make sure you have installed Redis and it is running.",
error,
);
}
});
export const cleanQueuesByApplication = async (applicationId: string) => {
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
for (const job of jobs) {
if (job?.data?.applicationId === applicationId) {
await job.remove();
console.log(`Removed job ${job.id} for application ${applicationId}`);
}
}
const groupId = `application:${applicationId}`;
myQueue.clearGroup(groupId);
console.log(`Cleared queue for application ${applicationId}`);
};
export const cleanQueuesByCompose = async (composeId: string) => {
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
const groupId = `compose:${composeId}`;
myQueue.clearGroup(groupId);
console.log(`Cleared queue for compose ${composeId}`);
};
for (const job of jobs) {
if (job?.data?.composeId === composeId) {
await job.remove();
console.log(`Removed job ${job.id} for compose ${composeId}`);
/**
* Add a job to the queue without awaiting (fire-and-forget)
* This allows the API to return immediately while the job processes in the background
* Errors are logged but don't block the response
*/
export const addJobAsync = (groupId: string, data: DeploymentJob): void => {
// Fire and forget - don't await, but handle errors
myQueue.add(groupId, data).catch((error) => {
console.error(`Failed to queue job for group ${groupId}:`, error);
});
};
/**
* Get the current deployment queue concurrency
*/
export const getConcurrency = (): number => {
return myQueue.getConcurrency();
};
/**
* Set the deployment queue concurrency dynamically
* This updates the queue's concurrency setting immediately
* WARNING: This will clear all pending builds when concurrency changes
* @returns The number of pending builds that were cleared
*/
export const setConcurrency = (concurrency: number): number => {
if (concurrency < 1) {
throw new Error("Concurrency must be at least 1");
}
const currentConcurrency = myQueue.getConcurrency();
const concurrencyChanged = currentConcurrency !== concurrency;
// Get count of pending tasks before clearing (setConcurrency will clear them)
let clearedCount = 0;
if (concurrencyChanged) {
// Get the count before setConcurrency clears them
clearedCount = myQueue.getTotalLength();
if (process.env.NODE_ENV !== "test") {
console.log(
`Concurrency changing from ${currentConcurrency} to ${concurrency}. Will clear ${clearedCount} pending builds.`,
);
}
}
// Update the stored concurrency value
DEPLOYMENT_CONCURRENCY = concurrency;
// Update the queue's concurrency dynamically (this will clear pending tasks)
myQueue.setConcurrency(concurrency);
if (process.env.NODE_ENV !== "test") {
console.log(`Deployment queue concurrency updated to ${concurrency}`);
}
return clearedCount;
};
export { myQueue };
+3 -1
View File
@@ -4,11 +4,11 @@ import {
createDefaultServerTraefikConfig,
createDefaultTraefikConfig,
IS_CLOUD,
initCancelDeployments,
initCronJobs,
initializeNetwork,
initSchedules,
initVolumeBackupsCronJobs,
initCancelDeployments,
sendDokployRestartNotifications,
setupDirectories,
} from "@dokploy/server";
@@ -66,6 +66,8 @@ void app.prepare().then(async () => {
console.log(`Server Started on: http://${HOST}:${PORT}`);
if (!IS_CLOUD) {
console.log("Starting Deployment Worker");
// Import the handler module to ensure it's initialized
await import("./queues/deployments-queue");
const { deploymentWorker } = await import("./queues/deployments-queue");
await deploymentWorker.run();
}
@@ -97,7 +97,7 @@ docker ${buildArgs.join(" ")} || {
exit 1;
}
echo "✅ Railpack build completed." ;
docker buildx rm builder-containerd
docker buildx rm builder-containerd || true
`;
return bashCommand;