mirror of
https://github.com/dokploy/dokploy.git
synced 2026-06-14 03:19:49 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1379b2118f | |||
| 794cd79973 |
@@ -0,0 +1,809 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { GroupedQueue } from "../../server/queues/grouped-queue-wrapper";
|
||||
|
||||
describe("GroupedQueue", () => {
|
||||
describe("Basic functionality", () => {
|
||||
it("should process a single job with concurrency 1", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
await queue.add("group1", { id: "job1" });
|
||||
|
||||
// Wait for processing to complete
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
expect(processed).toEqual(["job1"]);
|
||||
expect(queue.isIdle()).toBe(true);
|
||||
});
|
||||
|
||||
it("should process jobs in FIFO order within a group", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add multiple jobs to the same group
|
||||
await Promise.all([
|
||||
queue.add("group1", { id: "job1" }),
|
||||
queue.add("group1", { id: "job2" }),
|
||||
queue.add("group1", { id: "job3" }),
|
||||
]);
|
||||
|
||||
// Wait for all processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
expect(processed).toEqual(["job1", "job2", "job3"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Concurrency 1 with multiple groups", () => {
|
||||
it("should process one group at a time with concurrency 1", async () => {
|
||||
const queue = new GroupedQueue<{ id: string; group: string }>(1);
|
||||
const processed: string[] = [];
|
||||
const activeGroups: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
activeGroups.push(data.group);
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
processed.push(data.id);
|
||||
activeGroups.pop();
|
||||
});
|
||||
|
||||
// Add jobs to 3 different groups
|
||||
const promises = [
|
||||
queue.add("app1", { id: "job1", group: "app1" }),
|
||||
queue.add("app2", { id: "job2", group: "app2" }),
|
||||
queue.add("app3", { id: "job3", group: "app3" }),
|
||||
];
|
||||
|
||||
// Check after 30ms - only one should be processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||
expect(activeGroups.length).toBeLessThanOrEqual(1);
|
||||
|
||||
// Wait for all to complete
|
||||
await Promise.all(promises);
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
expect(processed).toHaveLength(3);
|
||||
expect(queue.isIdle()).toBe(true);
|
||||
});
|
||||
|
||||
it("should process groups sequentially with concurrency 1", async () => {
|
||||
const queue = new GroupedQueue<{ id: string; group: string }>(1);
|
||||
const processingOrder: string[] = [];
|
||||
const startTimes: Map<string, number> = new Map();
|
||||
const endTimes: Map<string, number> = new Map();
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
startTimes.set(data.id, Date.now());
|
||||
processingOrder.push(`start-${data.group}`);
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
endTimes.set(data.id, Date.now());
|
||||
processingOrder.push(`end-${data.group}`);
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
queue.add("app1", { id: "job1", group: "app1" }),
|
||||
queue.add("app2", { id: "job2", group: "app2" }),
|
||||
queue.add("app3", { id: "job3", group: "app3" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 300));
|
||||
|
||||
// Verify sequential processing
|
||||
expect(processingOrder).toEqual([
|
||||
"start-app1",
|
||||
"end-app1",
|
||||
"start-app2",
|
||||
"end-app2",
|
||||
"start-app3",
|
||||
"end-app3",
|
||||
]);
|
||||
|
||||
// Verify jobs don't overlap
|
||||
const job1End = endTimes.get("job1")!;
|
||||
const job2Start = startTimes.get("job2")!;
|
||||
const job2End = endTimes.get("job2")!;
|
||||
const job3Start = startTimes.get("job3")!;
|
||||
|
||||
expect(job2Start).toBeGreaterThanOrEqual(job1End);
|
||||
expect(job3Start).toBeGreaterThanOrEqual(job2End);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Concurrency 3 with 4 groups", () => {
|
||||
it("should process up to 3 groups simultaneously", async () => {
|
||||
const queue = new GroupedQueue<{ id: string; group: string }>(3);
|
||||
const activeGroups = new Set<string>();
|
||||
const maxConcurrent = { value: 0 };
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
activeGroups.add(data.group);
|
||||
maxConcurrent.value = Math.max(maxConcurrent.value, activeGroups.size);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
activeGroups.delete(data.group);
|
||||
});
|
||||
|
||||
// Add 4 jobs to different groups
|
||||
await Promise.all([
|
||||
queue.add("app1", { id: "job1", group: "app1" }),
|
||||
queue.add("app2", { id: "job2", group: "app2" }),
|
||||
queue.add("app3", { id: "job3", group: "app3" }),
|
||||
queue.add("app4", { id: "job4", group: "app4" }),
|
||||
]);
|
||||
|
||||
// Check during processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Should have processed 3 groups simultaneously
|
||||
expect(maxConcurrent.value).toBe(3);
|
||||
expect(activeGroups.size).toBeLessThanOrEqual(3);
|
||||
|
||||
// Wait for all to complete
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
expect(queue.isIdle()).toBe(true);
|
||||
});
|
||||
|
||||
it("should process 4th group after one of the first 3 completes", async () => {
|
||||
const queue = new GroupedQueue<{ id: string; group: string }>(3);
|
||||
const processingOrder: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
processingOrder.push(`start-${data.group}`);
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
processingOrder.push(`end-${data.group}`);
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
queue.add("app1", { id: "job1", group: "app1" }),
|
||||
queue.add("app2", { id: "job2", group: "app2" }),
|
||||
queue.add("app3", { id: "job3", group: "app3" }),
|
||||
queue.add("app4", { id: "job4", group: "app4" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 250));
|
||||
|
||||
// First 3 should start together
|
||||
const firstThree = processingOrder.slice(0, 3);
|
||||
expect(firstThree).toContain("start-app1");
|
||||
expect(firstThree).toContain("start-app2");
|
||||
expect(firstThree).toContain("start-app3");
|
||||
|
||||
// 4th should start after one completes
|
||||
const app4StartIndex = processingOrder.indexOf("start-app4");
|
||||
expect(app4StartIndex).toBeGreaterThan(0);
|
||||
expect(app4StartIndex).toBeLessThan(processingOrder.length - 1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Multiple jobs per group", () => {
|
||||
it("should process jobs sequentially within same group", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(3);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add 3 jobs to same group
|
||||
await Promise.all([
|
||||
queue.add("app1", { id: "job1" }),
|
||||
queue.add("app1", { id: "job2" }),
|
||||
queue.add("app1", { id: "job3" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
// Should process in order
|
||||
expect(processed).toEqual(["job1", "job2", "job3"]);
|
||||
});
|
||||
|
||||
it("should process multiple groups with multiple jobs each", async () => {
|
||||
const queue = new GroupedQueue<{ id: string; group: string }>(2);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
processed.push(`${data.group}-${data.id}`);
|
||||
});
|
||||
|
||||
// Add jobs to 2 groups, 2 jobs each
|
||||
await Promise.all([
|
||||
queue.add("app1", { id: "job1", group: "app1" }),
|
||||
queue.add("app1", { id: "job2", group: "app1" }),
|
||||
queue.add("app2", { id: "job1", group: "app2" }),
|
||||
queue.add("app2", { id: "job2", group: "app2" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
// Should process both groups, jobs within each group in order
|
||||
expect(processed).toHaveLength(4);
|
||||
expect(processed.filter((p) => p.startsWith("app1"))).toEqual([
|
||||
"app1-job1",
|
||||
"app1-job2",
|
||||
]);
|
||||
expect(processed.filter((p) => p.startsWith("app2"))).toEqual([
|
||||
"app2-job1",
|
||||
"app2-job2",
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Error handling", () => {
|
||||
it("should reject job on handler error", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
|
||||
queue.setHandler(async () => {
|
||||
throw new Error("Test error");
|
||||
});
|
||||
|
||||
await expect(queue.add("group1", { id: "job1" })).rejects.toThrow(
|
||||
"Test error",
|
||||
);
|
||||
});
|
||||
|
||||
it("should continue processing other jobs after error", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
if (data.id === "job2") {
|
||||
throw new Error("Job 2 error");
|
||||
}
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
await expect(
|
||||
queue.add("group1", { id: "job1" }),
|
||||
).resolves.toBeUndefined();
|
||||
await expect(queue.add("group1", { id: "job2" })).rejects.toThrow();
|
||||
await expect(
|
||||
queue.add("group1", { id: "job3" }),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
expect(processed).toEqual(["job1", "job3"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Queue management", () => {
|
||||
it("should clear group tasks", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add jobs without awaiting - they'll start processing
|
||||
const job1Promise = queue.add("group1", { id: "job1" });
|
||||
const job2Promise = queue.add("group1", { id: "job2" });
|
||||
|
||||
// Clear immediately - job1 might be processing, but job2 should be cleared
|
||||
queue.clearGroup("group1");
|
||||
|
||||
// Use Promise.allSettled to handle both promises properly
|
||||
const results = await Promise.allSettled([job1Promise, job2Promise]);
|
||||
|
||||
// job1 might succeed or fail depending on timing
|
||||
// job2 should be rejected
|
||||
const job2Result = results[1];
|
||||
if (job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe("Queue cleared");
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
// Job1 might have processed, but job2 should not
|
||||
expect(processed.length).toBeLessThanOrEqual(1);
|
||||
});
|
||||
|
||||
it("should return correct group length", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
|
||||
queue.setHandler(async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
});
|
||||
|
||||
// Add jobs without awaiting - check length immediately
|
||||
const promises = [
|
||||
queue.add("group1", { id: "job1" }),
|
||||
queue.add("group1", { id: "job2" }),
|
||||
queue.add("group1", { id: "job3" }),
|
||||
];
|
||||
|
||||
// Check length immediately - at least some should be pending
|
||||
// (job1 might be processing, but job2 and job3 should be pending)
|
||||
const length = queue.getGroupLength("group1");
|
||||
expect(length).toBeGreaterThanOrEqual(0);
|
||||
|
||||
// Wait for all to complete
|
||||
await Promise.all(promises);
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// After processing should be 0
|
||||
expect(queue.getGroupLength("group1")).toBe(0);
|
||||
});
|
||||
|
||||
it("should close queue and reject pending tasks", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
|
||||
queue.setHandler(async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
});
|
||||
|
||||
// Add first job and wait a bit to ensure it starts processing
|
||||
const job1Promise = queue.add("group1", { id: "job1" });
|
||||
// Add second job without awaiting
|
||||
const job2Promise = queue.add("group1", { id: "job2" });
|
||||
|
||||
// Wait a tiny bit to ensure job2 is queued
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Close queue - job2 should be rejected
|
||||
await queue.close();
|
||||
|
||||
// Use Promise.allSettled to handle both promises properly
|
||||
const results = await Promise.allSettled([job1Promise, job2Promise]);
|
||||
|
||||
// job1 might succeed or fail depending on timing
|
||||
// job2 should be rejected
|
||||
const job2Result = results[1];
|
||||
if (job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe("Queue closed");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("Concurrency edge cases", () => {
|
||||
it("should handle concurrency 1 with 1 app correctly", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
await queue.add("app1", { id: "job1" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
expect(processed).toEqual(["job1"]);
|
||||
expect(queue.getActiveGroupsCount()).toBe(0);
|
||||
});
|
||||
|
||||
it("should handle concurrency 1 with 3 apps correctly", async () => {
|
||||
const queue = new GroupedQueue<{ id: string; app: string }>(1);
|
||||
const processingTimes: Map<string, { start: number; end: number }> =
|
||||
new Map();
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
const start = Date.now();
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
const end = Date.now();
|
||||
processingTimes.set(data.app, { start, end });
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
queue.add("app1", { id: "job1", app: "app1" }),
|
||||
queue.add("app2", { id: "job2", app: "app2" }),
|
||||
queue.add("app3", { id: "job3", app: "app3" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 300));
|
||||
|
||||
// Verify sequential processing
|
||||
const app1 = processingTimes.get("app1")!;
|
||||
const app2 = processingTimes.get("app2")!;
|
||||
const app3 = processingTimes.get("app3")!;
|
||||
|
||||
expect(app2.start).toBeGreaterThanOrEqual(app1.end);
|
||||
expect(app3.start).toBeGreaterThanOrEqual(app2.end);
|
||||
expect(queue.getActiveGroupsCount()).toBe(0);
|
||||
});
|
||||
|
||||
it("should handle 4 apps with concurrency 3 correctly", async () => {
|
||||
const queue = new GroupedQueue<{ id: string; app: string }>(3);
|
||||
const concurrentCounts: number[] = [];
|
||||
|
||||
queue.setHandler(async () => {
|
||||
// Track concurrent processing
|
||||
const interval = setInterval(() => {
|
||||
concurrentCounts.push(queue.getActiveGroupsCount());
|
||||
}, 10);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
clearInterval(interval);
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
queue.add("app1", { id: "job1", app: "app1" }),
|
||||
queue.add("app2", { id: "job2", app: "app2" }),
|
||||
queue.add("app3", { id: "job3", app: "app3" }),
|
||||
queue.add("app4", { id: "job4", app: "app4" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
// Should never exceed concurrency of 3
|
||||
const maxConcurrent = Math.max(...concurrentCounts);
|
||||
expect(maxConcurrent).toBeLessThanOrEqual(3);
|
||||
expect(queue.getActiveGroupsCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Idle state", () => {
|
||||
it("should be idle when no jobs are processing", () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
expect(queue.isIdle()).toBe(true);
|
||||
});
|
||||
|
||||
it("should not be idle while processing", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
let isIdleDuringProcessing = false;
|
||||
|
||||
queue.setHandler(async () => {
|
||||
isIdleDuringProcessing = queue.isIdle();
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
});
|
||||
|
||||
await queue.add("group1", { id: "job1" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||
|
||||
expect(isIdleDuringProcessing).toBe(false);
|
||||
expect(queue.isIdle()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Concurrency management", () => {
|
||||
it("should get current concurrency", () => {
|
||||
const queue1 = new GroupedQueue<{ id: string }>(1);
|
||||
const queue2 = new GroupedQueue<{ id: string }>(5);
|
||||
const queue3 = new GroupedQueue<{ id: string }>(10);
|
||||
|
||||
expect(queue1.getConcurrency()).toBe(1);
|
||||
expect(queue2.getConcurrency()).toBe(5);
|
||||
expect(queue3.getConcurrency()).toBe(10);
|
||||
});
|
||||
|
||||
it("should set concurrency dynamically", () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
expect(queue.getConcurrency()).toBe(1);
|
||||
|
||||
queue.setConcurrency(3);
|
||||
expect(queue.getConcurrency()).toBe(3);
|
||||
|
||||
queue.setConcurrency(5);
|
||||
expect(queue.getConcurrency()).toBe(5);
|
||||
});
|
||||
|
||||
it("should throw error when setting concurrency less than 1", () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
expect(() => queue.setConcurrency(0)).toThrow(
|
||||
"Concurrency must be at least 1",
|
||||
);
|
||||
expect(() => queue.setConcurrency(-1)).toThrow(
|
||||
"Concurrency must be at least 1",
|
||||
);
|
||||
});
|
||||
|
||||
it("should process next group when concurrency increases", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add jobs to 3 different groups with concurrency 1
|
||||
const job1Promise = queue.add("group1", { id: "job1" });
|
||||
const job2Promise = queue.add("group2", { id: "job2" });
|
||||
const job3Promise = queue.add("group3", { id: "job3" });
|
||||
|
||||
// Wait a bit to ensure job1 starts processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Increase concurrency to 3 - should allow group2 and group3 to start
|
||||
queue.setConcurrency(3);
|
||||
|
||||
// Wait for all to complete
|
||||
await Promise.all([job1Promise, job2Promise, job3Promise]);
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
expect(processed).toHaveLength(3);
|
||||
expect(processed).toContain("job1");
|
||||
expect(processed).toContain("job2");
|
||||
expect(processed).toContain("job3");
|
||||
});
|
||||
});
|
||||
|
||||
describe("Clear all pending tasks", () => {
|
||||
it("should clear all pending tasks across all groups", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add multiple jobs to different groups
|
||||
const job1Promise = queue.add("group1", { id: "job1" });
|
||||
const job2Promise = queue.add("group1", { id: "job2" });
|
||||
const job3Promise = queue.add("group2", { id: "job3" });
|
||||
const job4Promise = queue.add("group2", { id: "job4" });
|
||||
const job5Promise = queue.add("group3", { id: "job5" });
|
||||
|
||||
// Wait a bit to ensure job1 starts processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Clear all pending tasks
|
||||
const clearedCount = queue.clearAllPendingTasks();
|
||||
|
||||
// Should have cleared 4 pending tasks (job2, job3, job4, job5)
|
||||
// job1 is processing so it's not in the queue anymore
|
||||
expect(clearedCount).toBe(4);
|
||||
|
||||
// Handle all promises
|
||||
const results = await Promise.allSettled([
|
||||
job1Promise,
|
||||
job2Promise,
|
||||
job3Promise,
|
||||
job4Promise,
|
||||
job5Promise,
|
||||
]);
|
||||
|
||||
// job1 should succeed (it was processing)
|
||||
const job1Result = results[0];
|
||||
expect(job1Result.status).toBe("fulfilled");
|
||||
|
||||
// All pending jobs should be rejected
|
||||
for (let i = 1; i < results.length; i++) {
|
||||
const result = results[i];
|
||||
if (result && result.status === "rejected") {
|
||||
expect(result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for job1 to complete
|
||||
await new Promise((resolve) => setTimeout(resolve, 150));
|
||||
|
||||
// Only job1 should have processed
|
||||
expect(processed).toHaveLength(1);
|
||||
expect(processed).toContain("job1");
|
||||
});
|
||||
|
||||
it("should not clear tasks that are currently processing", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add jobs - first one will start processing immediately
|
||||
const job1Promise = queue.add("group1", { id: "job1" });
|
||||
const job2Promise = queue.add("group1", { id: "job2" });
|
||||
|
||||
// Wait to ensure job1 is processing (it's been shifted from tasks)
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
|
||||
// Clear all pending - should only clear job2, not job1
|
||||
// job1 is already executing (not in tasks array)
|
||||
const clearedCount = queue.clearAllPendingTasks();
|
||||
|
||||
expect(clearedCount).toBe(1);
|
||||
|
||||
// Handle all promises
|
||||
const results = await Promise.allSettled([job1Promise, job2Promise]);
|
||||
|
||||
// job1 should succeed (it was processing)
|
||||
const job1Result = results[0];
|
||||
expect(job1Result.status).toBe("fulfilled");
|
||||
|
||||
// job2 should be rejected
|
||||
const job2Result = results[1];
|
||||
if (job2Result && job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Only job1 should have processed
|
||||
expect(processed).toHaveLength(1);
|
||||
expect(processed).toContain("job1");
|
||||
});
|
||||
|
||||
it("should return 0 when no pending tasks", () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const clearedCount = queue.clearAllPendingTasks();
|
||||
expect(clearedCount).toBe(0);
|
||||
});
|
||||
|
||||
it("should clear tasks from multiple groups", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add jobs to multiple groups
|
||||
const promises = [
|
||||
queue.add("group1", { id: "job1" }),
|
||||
queue.add("group1", { id: "job2" }),
|
||||
queue.add("group2", { id: "job3" }),
|
||||
queue.add("group2", { id: "job4" }),
|
||||
queue.add("group3", { id: "job5" }),
|
||||
];
|
||||
|
||||
// Wait a bit for first job to start (it gets shifted from tasks)
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Clear all pending
|
||||
const clearedCount = queue.clearAllPendingTasks();
|
||||
|
||||
// Should clear 4 tasks (job2, job3, job4, job5)
|
||||
// job1 is processing so it's not in the queue anymore
|
||||
expect(clearedCount).toBe(4);
|
||||
|
||||
// Handle all promises
|
||||
const results = await Promise.allSettled(promises);
|
||||
|
||||
// job1 should succeed
|
||||
const job1Result = results[0];
|
||||
expect(job1Result?.status).toBe("fulfilled");
|
||||
|
||||
// Others should be rejected
|
||||
for (let i = 1; i < results.length; i++) {
|
||||
const result = results[i];
|
||||
if (result && result.status === "rejected") {
|
||||
expect(result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
// Only first job should process
|
||||
expect(processed.length).toBeLessThanOrEqual(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Concurrency change with pending tasks", () => {
|
||||
it("should clear pending tasks when concurrency changes", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add jobs with concurrency 1
|
||||
const job1Promise = queue.add("group1", { id: "job1" });
|
||||
const job2Promise = queue.add("group1", { id: "job2" });
|
||||
const job3Promise = queue.add("group2", { id: "job3" });
|
||||
|
||||
// Wait for job1 to start processing (it gets shifted from tasks)
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Change concurrency - should clear pending tasks via clearAllPendingTasks
|
||||
queue.setConcurrency(3);
|
||||
|
||||
// Handle all promises
|
||||
const results = await Promise.allSettled([
|
||||
job1Promise,
|
||||
job2Promise,
|
||||
job3Promise,
|
||||
]);
|
||||
|
||||
// job1 should succeed (it was processing)
|
||||
const job1Result = results[0];
|
||||
expect(job1Result.status).toBe("fulfilled");
|
||||
|
||||
// Pending jobs should be rejected (job2 and job3 were in queue when cleared)
|
||||
const job2Result = results[1];
|
||||
const job3Result = results[2];
|
||||
|
||||
// At least one of the pending jobs should be rejected
|
||||
const rejectedCount = [job2Result, job3Result].filter(
|
||||
(r) => r && r.status === "rejected",
|
||||
).length;
|
||||
expect(rejectedCount).toBeGreaterThan(0);
|
||||
|
||||
// Verify rejection messages
|
||||
if (job2Result && job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
}
|
||||
if (job3Result && job3Result.status === "rejected") {
|
||||
expect(job3Result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
// job1 should have processed, others may or may not depending on timing
|
||||
expect(processed.length).toBeGreaterThanOrEqual(1);
|
||||
expect(processed).toContain("job1");
|
||||
});
|
||||
|
||||
it("should allow new jobs after concurrency change", async () => {
|
||||
const queue = new GroupedQueue<{ id: string }>(1);
|
||||
const processed: string[] = [];
|
||||
|
||||
queue.setHandler(async (data) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add job with concurrency 1
|
||||
const job1Promise = queue.add("group1", { id: "job1" });
|
||||
const job2Promise = queue.add("group1", { id: "job2" });
|
||||
|
||||
// Wait for job1 to start (it gets shifted from tasks)
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Change concurrency to 3 - this calls clearAllPendingTasks internally
|
||||
queue.setConcurrency(3);
|
||||
|
||||
// Handle all promises
|
||||
const results = await Promise.allSettled([job1Promise, job2Promise]);
|
||||
|
||||
// job1 should succeed (it was processing)
|
||||
const job1Result = results[0];
|
||||
expect(job1Result.status).toBe("fulfilled");
|
||||
|
||||
// job2 should be rejected (it was in queue when cleared)
|
||||
const job2Result = results[1];
|
||||
if (job2Result && job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
} else {
|
||||
// If job2 wasn't rejected, it means it started processing before clear
|
||||
// This is acceptable as it's a timing issue
|
||||
}
|
||||
|
||||
// Add new jobs after concurrency change - they should work
|
||||
await Promise.all([
|
||||
queue.add("group2", { id: "job3" }),
|
||||
queue.add("group3", { id: "job4" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
// job1, job3, and job4 should have processed
|
||||
expect(processed.length).toBeGreaterThanOrEqual(2);
|
||||
expect(processed).toContain("job1");
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,313 @@
|
||||
import { beforeEach, describe, expect, it } from "vitest";
|
||||
import { QueueManager } from "../../server/queues/queue-manager";
|
||||
|
||||
describe("QueueManager", () => {
|
||||
let manager: QueueManager;
|
||||
|
||||
beforeEach(() => {
|
||||
manager = new QueueManager();
|
||||
});
|
||||
|
||||
describe("Queue creation and retrieval", () => {
|
||||
it("should create a queue with default concurrency 1", () => {
|
||||
const queue = manager.getQueue("test-queue");
|
||||
expect(queue.getConcurrency()).toBe(1);
|
||||
});
|
||||
|
||||
it("should create a queue with custom concurrency", () => {
|
||||
const queue = manager.getQueue("test-queue", 5);
|
||||
expect(queue.getConcurrency()).toBe(5);
|
||||
});
|
||||
|
||||
it("should return the same queue instance for the same name", () => {
|
||||
const queue1 = manager.getQueue("test-queue", 3);
|
||||
const queue2 = manager.getQueue("test-queue", 5);
|
||||
expect(queue1).toBe(queue2);
|
||||
// Concurrency should remain as first set
|
||||
expect(queue1.getConcurrency()).toBe(3);
|
||||
});
|
||||
|
||||
it("should create different queues for different names", () => {
|
||||
const queue1 = manager.getQueue("queue1", 2);
|
||||
const queue2 = manager.getQueue("queue2", 4);
|
||||
expect(queue1).not.toBe(queue2);
|
||||
expect(queue1.getConcurrency()).toBe(2);
|
||||
expect(queue2.getConcurrency()).toBe(4);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Handler management", () => {
|
||||
it("should set handler for a queue", async () => {
|
||||
const processed: string[] = [];
|
||||
|
||||
manager.setHandler("test-queue", async (data: { id: string }) => {
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
await manager.add("test-queue", "group1", { id: "job1" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(processed).toEqual(["job1"]);
|
||||
});
|
||||
|
||||
it("should handle different handlers for different queues", async () => {
|
||||
const queue1Processed: string[] = [];
|
||||
const queue2Processed: string[] = [];
|
||||
|
||||
manager.setHandler("queue1", async (data: { id: string }) => {
|
||||
queue1Processed.push(data.id);
|
||||
});
|
||||
|
||||
manager.setHandler("queue2", async (data: { id: string }) => {
|
||||
queue2Processed.push(data.id);
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
manager.add("queue1", "group1", { id: "job1" }),
|
||||
manager.add("queue2", "group1", { id: "job2" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(queue1Processed).toEqual(["job1"]);
|
||||
expect(queue2Processed).toEqual(["job2"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Job management", () => {
|
||||
it("should add jobs to correct queue and group", async () => {
|
||||
const processed: string[] = [];
|
||||
|
||||
manager.setHandler("test-queue", async (data: { id: string }) => {
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
await manager.add("test-queue", "group1", { id: "job1" });
|
||||
await manager.add("test-queue", "group2", { id: "job2" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(processed).toContain("job1");
|
||||
expect(processed).toContain("job2");
|
||||
});
|
||||
|
||||
it("should create queue with concurrency when adding job", async () => {
|
||||
const processed: string[] = [];
|
||||
|
||||
// Create queue with concurrency first (without handler)
|
||||
manager.getQueue("new-queue", 3);
|
||||
|
||||
// Set handler
|
||||
manager.setHandler("new-queue", async (data: { id: string }) => {
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Now add job - it should process
|
||||
await manager.add("new-queue", "group1", { id: "job1" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
const queue = manager.getQueue("new-queue");
|
||||
expect(queue.getConcurrency()).toBe(3);
|
||||
expect(processed).toEqual(["job1"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Queue operations", () => {
|
||||
it("should clear group in specific queue", async () => {
|
||||
const processed: string[] = [];
|
||||
|
||||
manager.setHandler("test-queue", async (data: { id: string }) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
processed.push(data.id);
|
||||
});
|
||||
|
||||
// Add jobs but don't await - they'll start processing
|
||||
const job1Promise = manager.add("test-queue", "group1", { id: "job1" });
|
||||
const job2Promise = manager.add("test-queue", "group1", { id: "job2" });
|
||||
|
||||
// Clear immediately - job1 might be processing, but job2 should be cleared
|
||||
manager.clearGroup("test-queue", "group1");
|
||||
|
||||
// Use Promise.allSettled to handle both promises properly
|
||||
const results = await Promise.allSettled([job1Promise, job2Promise]);
|
||||
|
||||
// job1 might succeed or fail depending on timing
|
||||
// job2 should be rejected
|
||||
const job2Result = results[1];
|
||||
if (job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe("Queue cleared");
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 150));
|
||||
|
||||
// Job1 might have processed, but job2 should not
|
||||
expect(processed.length).toBeLessThanOrEqual(1);
|
||||
});
|
||||
|
||||
it("should get group length for specific queue", async () => {
|
||||
manager.setHandler("test-queue", async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
});
|
||||
|
||||
// Add jobs without awaiting - check length immediately
|
||||
const job1Promise = manager.add("test-queue", "group1", { id: "job1" });
|
||||
const job2Promise = manager.add("test-queue", "group1", { id: "job2" });
|
||||
|
||||
// Check length immediately - at least one should be pending
|
||||
// (job1 might be processing, but job2 should be pending)
|
||||
const length = manager.getGroupLength("test-queue", "group1");
|
||||
expect(length).toBeGreaterThanOrEqual(0);
|
||||
|
||||
// Wait for both to complete
|
||||
await Promise.all([job1Promise, job2Promise]);
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(manager.getGroupLength("test-queue", "group1")).toBe(0);
|
||||
});
|
||||
|
||||
it("should get total length for specific queue", async () => {
|
||||
manager.setHandler("test-queue", async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
});
|
||||
|
||||
// Add jobs without awaiting - check length immediately
|
||||
const promises = [
|
||||
manager.add("test-queue", "group1", { id: "job1" }),
|
||||
manager.add("test-queue", "group2", { id: "job2" }),
|
||||
manager.add("test-queue", "group3", { id: "job3" }),
|
||||
];
|
||||
|
||||
// Check length immediately - at least some should be pending
|
||||
const length = manager.getTotalLength("test-queue");
|
||||
expect(length).toBeGreaterThanOrEqual(0);
|
||||
|
||||
// Wait for all to complete
|
||||
await Promise.all(promises);
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
expect(manager.getTotalLength("test-queue")).toBe(0);
|
||||
});
|
||||
|
||||
it("should check if queue is idle", async () => {
|
||||
manager.setHandler("test-queue", async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
});
|
||||
|
||||
expect(manager.isIdle("test-queue")).toBe(true);
|
||||
|
||||
await manager.add("test-queue", "group1", { id: "job1" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
expect(manager.isIdle("test-queue")).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Queue lifecycle", () => {
|
||||
it("should close a specific queue", async () => {
|
||||
manager.setHandler("test-queue", async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
});
|
||||
|
||||
// Add first job and wait a bit to ensure it starts processing
|
||||
const job1Promise = manager.add("test-queue", "group1", { id: "job1" });
|
||||
// Add second job without awaiting
|
||||
const job2Promise = manager.add("test-queue", "group1", { id: "job2" });
|
||||
|
||||
// Wait a tiny bit to ensure job2 is queued
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Close queue - job2 should be rejected
|
||||
await manager.closeQueue("test-queue");
|
||||
|
||||
// Use Promise.allSettled to handle both promises properly
|
||||
const results = await Promise.allSettled([job1Promise, job2Promise]);
|
||||
|
||||
// job1 might succeed or fail depending on timing
|
||||
// job2 should be rejected
|
||||
const job2Result = results[1];
|
||||
if (job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe("Queue closed");
|
||||
}
|
||||
|
||||
expect(manager.getQueueNames()).not.toContain("test-queue");
|
||||
});
|
||||
|
||||
it("should close all queues", async () => {
|
||||
manager.setHandler("queue1", async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
});
|
||||
manager.setHandler("queue2", async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
});
|
||||
|
||||
await manager.add("queue1", "group1", { id: "job1" });
|
||||
await manager.add("queue2", "group1", { id: "job2" });
|
||||
|
||||
await manager.closeAll();
|
||||
|
||||
expect(manager.getQueueNames()).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("should get all queue names", () => {
|
||||
manager.getQueue("queue1");
|
||||
manager.getQueue("queue2");
|
||||
manager.getQueue("queue3");
|
||||
|
||||
const names = manager.getQueueNames();
|
||||
expect(names).toContain("queue1");
|
||||
expect(names).toContain("queue2");
|
||||
expect(names).toContain("queue3");
|
||||
expect(names).toHaveLength(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Multiple queues with different concurrency", () => {
|
||||
it("should handle multiple queues with different concurrency settings", async () => {
|
||||
const queue1Processed: string[] = [];
|
||||
const queue2Processed: string[] = [];
|
||||
|
||||
// Create queues with specific concurrency FIRST, before setting handlers
|
||||
const queue1 = manager.getQueue("queue1", 1);
|
||||
const queue2 = manager.getQueue("queue2", 3);
|
||||
|
||||
// Verify concurrency is set correctly before proceeding
|
||||
expect(queue1.getConcurrency()).toBe(1);
|
||||
expect(queue2.getConcurrency()).toBe(3);
|
||||
|
||||
manager.setHandler("queue1", async (data: { id: string }) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
queue1Processed.push(data.id);
|
||||
});
|
||||
|
||||
manager.setHandler("queue2", async (data: { id: string }) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
queue2Processed.push(data.id);
|
||||
});
|
||||
|
||||
// Queue1 with concurrency 1 (sequential)
|
||||
await Promise.all([
|
||||
manager.add("queue1", "app1", { id: "job1" }),
|
||||
manager.add("queue1", "app2", { id: "job2" }),
|
||||
]);
|
||||
|
||||
// Queue2 with concurrency 3 (parallel)
|
||||
await Promise.all([
|
||||
manager.add("queue2", "app1", { id: "job1" }),
|
||||
manager.add("queue2", "app2", { id: "job2" }),
|
||||
manager.add("queue2", "app3", { id: "job3" }),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
expect(queue1Processed).toHaveLength(2);
|
||||
expect(queue2Processed).toHaveLength(3);
|
||||
|
||||
// Verify concurrency settings are still correct
|
||||
expect(manager.getQueue("queue1").getConcurrency()).toBe(1);
|
||||
expect(manager.getQueue("queue2").getConcurrency()).toBe(3);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,250 @@
|
||||
import { beforeEach, describe, expect, it } from "vitest";
|
||||
import type { DeploymentJob } from "../../server/queues/queue-types";
|
||||
import {
|
||||
getConcurrency,
|
||||
myQueue,
|
||||
setConcurrency,
|
||||
} from "../../server/queues/queueSetup";
|
||||
|
||||
describe("queueSetup", () => {
|
||||
beforeEach(() => {
|
||||
// Reset concurrency to default (1) before each test
|
||||
setConcurrency(1);
|
||||
// Clear all pending tasks
|
||||
myQueue.clearAllPendingTasks();
|
||||
});
|
||||
|
||||
describe("getConcurrency", () => {
|
||||
it("should return default concurrency of 1", () => {
|
||||
const concurrency = getConcurrency();
|
||||
expect(concurrency).toBe(1);
|
||||
});
|
||||
|
||||
it("should return current concurrency after setting it", () => {
|
||||
setConcurrency(3);
|
||||
expect(getConcurrency()).toBe(3);
|
||||
|
||||
setConcurrency(5);
|
||||
expect(getConcurrency()).toBe(5);
|
||||
});
|
||||
});
|
||||
|
||||
describe("setConcurrency", () => {
|
||||
it("should set concurrency successfully", () => {
|
||||
const clearedCount = setConcurrency(3);
|
||||
expect(getConcurrency()).toBe(3);
|
||||
expect(clearedCount).toBe(0); // No pending tasks to clear
|
||||
});
|
||||
|
||||
it("should throw error for concurrency less than 1", () => {
|
||||
expect(() => setConcurrency(0)).toThrow("Concurrency must be at least 1");
|
||||
expect(() => setConcurrency(-1)).toThrow(
|
||||
"Concurrency must be at least 1",
|
||||
);
|
||||
});
|
||||
|
||||
it("should return 0 cleared builds when no pending tasks", () => {
|
||||
const clearedCount = setConcurrency(2);
|
||||
expect(clearedCount).toBe(0);
|
||||
expect(getConcurrency()).toBe(2);
|
||||
});
|
||||
|
||||
it("should clear pending builds when concurrency changes", async () => {
|
||||
const processed: string[] = [];
|
||||
|
||||
// Set handler
|
||||
myQueue.setHandler(async (job: DeploymentJob) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
if (job.applicationType === "application") {
|
||||
processed.push(job.applicationId);
|
||||
} else if (job.applicationType === "compose") {
|
||||
processed.push(job.composeId);
|
||||
} else if (job.applicationType === "application-preview") {
|
||||
processed.push(job.previewDeploymentId);
|
||||
}
|
||||
});
|
||||
|
||||
// Add jobs to different groups
|
||||
const job1: DeploymentJob = {
|
||||
applicationId: "app1",
|
||||
titleLog: "Test",
|
||||
descriptionLog: "Test",
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: false,
|
||||
};
|
||||
const job2: DeploymentJob = {
|
||||
applicationId: "app2",
|
||||
titleLog: "Test",
|
||||
descriptionLog: "Test",
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: false,
|
||||
};
|
||||
const job3: DeploymentJob = {
|
||||
applicationId: "app3",
|
||||
titleLog: "Test",
|
||||
descriptionLog: "Test",
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: false,
|
||||
};
|
||||
|
||||
// Add jobs without awaiting
|
||||
const promise1 = myQueue.add("application:app1", job1);
|
||||
const promise2 = myQueue.add("application:app2", job2);
|
||||
const promise3 = myQueue.add("application:app3", job3);
|
||||
|
||||
// Wait for first job to start processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Change concurrency - should clear pending builds
|
||||
const clearedCount = setConcurrency(3);
|
||||
|
||||
// Should have cleared 2 pending builds (app2 and app3)
|
||||
expect(clearedCount).toBe(2);
|
||||
expect(getConcurrency()).toBe(3);
|
||||
|
||||
// Handle all promises - use allSettled to handle both resolved and rejected
|
||||
const results = await Promise.allSettled([promise1, promise2, promise3]);
|
||||
|
||||
// job1 should succeed (it was processing), others should be rejected
|
||||
const job1Result = results[0];
|
||||
if (job1Result.status === "fulfilled") {
|
||||
// Job1 completed successfully
|
||||
}
|
||||
|
||||
// Pending jobs should be rejected
|
||||
const job2Result = results[1];
|
||||
const job3Result = results[2];
|
||||
if (job2Result && job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
}
|
||||
if (job3Result && job3Result.status === "rejected") {
|
||||
expect(job3Result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 150));
|
||||
|
||||
// Only first job should have processed
|
||||
expect(processed.length).toBeLessThanOrEqual(1);
|
||||
});
|
||||
|
||||
it("should not clear builds when concurrency doesn't change", async () => {
|
||||
// Set to 2
|
||||
setConcurrency(2);
|
||||
expect(getConcurrency()).toBe(2);
|
||||
|
||||
// Set to 2 again - should not clear anything
|
||||
const clearedCount = setConcurrency(2);
|
||||
expect(clearedCount).toBe(0);
|
||||
expect(getConcurrency()).toBe(2);
|
||||
});
|
||||
|
||||
it("should allow new jobs after concurrency change", async () => {
|
||||
const processed: string[] = [];
|
||||
|
||||
myQueue.setHandler(async (job: DeploymentJob) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
if (job.applicationType === "application") {
|
||||
processed.push(job.applicationId);
|
||||
}
|
||||
});
|
||||
|
||||
// Add job with concurrency 1
|
||||
const job1: DeploymentJob = {
|
||||
applicationId: "app1",
|
||||
titleLog: "Test",
|
||||
descriptionLog: "Test",
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: false,
|
||||
};
|
||||
const job2: DeploymentJob = {
|
||||
applicationId: "app2",
|
||||
titleLog: "Test",
|
||||
descriptionLog: "Test",
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: false,
|
||||
};
|
||||
|
||||
const promise1 = myQueue.add("application:app1", job1);
|
||||
const promise2 = myQueue.add("application:app2", job2);
|
||||
|
||||
// Wait for first job to start
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Change concurrency to 3
|
||||
const clearedCount = setConcurrency(3);
|
||||
expect(clearedCount).toBe(1); // app2 should be cleared
|
||||
|
||||
// Handle all promises - use allSettled to handle both resolved and rejected
|
||||
const results = await Promise.allSettled([promise1, promise2]);
|
||||
|
||||
// job1 should succeed (it was processing)
|
||||
const job1Result = results[0];
|
||||
if (job1Result.status === "fulfilled") {
|
||||
// Job1 completed successfully
|
||||
}
|
||||
|
||||
// app2 should be rejected
|
||||
const job2Result = results[1];
|
||||
if (job2Result.status === "rejected") {
|
||||
expect(job2Result.reason.message).toBe(
|
||||
"Concurrency changed - queue cleared",
|
||||
);
|
||||
}
|
||||
|
||||
// Add new jobs after concurrency change - they should work
|
||||
const job3: DeploymentJob = {
|
||||
applicationId: "app3",
|
||||
titleLog: "Test",
|
||||
descriptionLog: "Test",
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: false,
|
||||
};
|
||||
const job4: DeploymentJob = {
|
||||
applicationId: "app4",
|
||||
titleLog: "Test",
|
||||
descriptionLog: "Test",
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: false,
|
||||
};
|
||||
|
||||
await Promise.all([
|
||||
myQueue.add("application:app3", job3),
|
||||
myQueue.add("application:app4", job4),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 150));
|
||||
|
||||
// app1, app3, and app4 should have processed
|
||||
expect(processed.length).toBeGreaterThanOrEqual(2);
|
||||
expect(processed).toContain("app1");
|
||||
});
|
||||
|
||||
it("should handle multiple concurrency changes correctly", () => {
|
||||
// Start at 1
|
||||
expect(getConcurrency()).toBe(1);
|
||||
|
||||
// Change to 3
|
||||
setConcurrency(3);
|
||||
expect(getConcurrency()).toBe(3);
|
||||
|
||||
// Change to 5
|
||||
setConcurrency(5);
|
||||
expect(getConcurrency()).toBe(5);
|
||||
|
||||
// Change back to 1
|
||||
setConcurrency(1);
|
||||
expect(getConcurrency()).toBe(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -79,7 +79,7 @@ export const ShowGeneralApplication = ({ applicationId }: Props) => {
|
||||
>
|
||||
<Button
|
||||
variant="default"
|
||||
isLoading={data?.applicationStatus === "running"}
|
||||
// isLoading={data?.applicationStatus === "running"}
|
||||
className="flex items-center gap-1.5 group focus-visible:ring-2 focus-visible:ring-offset-2"
|
||||
>
|
||||
<Tooltip>
|
||||
|
||||
@@ -15,6 +15,7 @@ import { api } from "@/utils/api";
|
||||
import { ShowModalLogs } from "../../web-server/show-modal-logs";
|
||||
import { TerminalModal } from "../../web-server/terminal-modal";
|
||||
import { GPUSupportModal } from "../gpu-support-modal";
|
||||
import { ChangeConcurrencyModal } from "../change-concurrency-modal";
|
||||
|
||||
export const ShowDokployActions = () => {
|
||||
const { t } = useTranslation("settings");
|
||||
@@ -101,6 +102,14 @@ export const ShowDokployActions = () => {
|
||||
>
|
||||
Reload Redis
|
||||
</DropdownMenuItem>
|
||||
<ChangeConcurrencyModal>
|
||||
<DropdownMenuItem
|
||||
className="cursor-pointer"
|
||||
onSelect={(e) => e.preventDefault()}
|
||||
>
|
||||
Change Concurrency
|
||||
</DropdownMenuItem>
|
||||
</ChangeConcurrencyModal>
|
||||
</DropdownMenuGroup>
|
||||
</DropdownMenuContent>
|
||||
</DropdownMenu>
|
||||
|
||||
@@ -7,9 +7,11 @@ import {
|
||||
DialogTrigger,
|
||||
} from "@/components/ui/dialog";
|
||||
import { DropdownMenuItem } from "@/components/ui/dropdown-menu";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { ShowStorageActions } from "./show-storage-actions";
|
||||
import { ShowTraefikActions } from "./show-traefik-actions";
|
||||
import { ToggleDockerCleanup } from "./toggle-docker-cleanup";
|
||||
import { ChangeConcurrencyModal } from "../change-concurrency-modal";
|
||||
|
||||
interface Props {
|
||||
serverId: string;
|
||||
@@ -37,6 +39,16 @@ export const ShowServerActions = ({ serverId }: Props) => {
|
||||
<ShowTraefikActions serverId={serverId} />
|
||||
<ShowStorageActions serverId={serverId} />
|
||||
<ToggleDockerCleanup serverId={serverId} />
|
||||
<div className="col-span-2">
|
||||
<ChangeConcurrencyModal
|
||||
serverId={serverId}
|
||||
trigger={
|
||||
<Button variant="outline" className="w-full">
|
||||
Change Concurrency
|
||||
</Button>
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
|
||||
@@ -0,0 +1,180 @@
|
||||
"use client";
|
||||
|
||||
import { InfoIcon, Loader2 } from "lucide-react";
|
||||
import { useState } from "react";
|
||||
import { toast } from "sonner";
|
||||
import { Alert, AlertDescription } from "@/components/ui/alert";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import {
|
||||
Dialog,
|
||||
DialogContent,
|
||||
DialogDescription,
|
||||
DialogFooter,
|
||||
DialogHeader,
|
||||
DialogTitle,
|
||||
DialogTrigger,
|
||||
} from "@/components/ui/dialog";
|
||||
import { Input } from "@/components/ui/input";
|
||||
import { Label } from "@/components/ui/label";
|
||||
import { api } from "@/utils/api";
|
||||
|
||||
interface Props {
|
||||
serverId?: string;
|
||||
trigger?: React.ReactNode;
|
||||
}
|
||||
|
||||
export const ChangeConcurrencyModal = ({ serverId, trigger }: Props) => {
|
||||
const [isOpen, setIsOpen] = useState(false);
|
||||
const [concurrency, setConcurrency] = useState<number | "">("");
|
||||
|
||||
const { data, isLoading: isLoadingCurrent } =
|
||||
api.settings.getDeploymentConcurrency.useQuery(
|
||||
{ serverId },
|
||||
{
|
||||
enabled: isOpen,
|
||||
onSuccess: (data) => {
|
||||
if (concurrency === "") {
|
||||
setConcurrency(data.concurrency);
|
||||
}
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
const { mutateAsync, isLoading } =
|
||||
api.settings.setDeploymentConcurrency.useMutation();
|
||||
|
||||
const handleSubmit = async (e: React.FormEvent) => {
|
||||
e.preventDefault();
|
||||
if (
|
||||
typeof concurrency !== "number" ||
|
||||
concurrency < 1 ||
|
||||
concurrency > 20
|
||||
) {
|
||||
toast.error("Concurrency must be between 1 and 20");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await mutateAsync({ concurrency, serverId });
|
||||
if (result.clearedBuilds > 0) {
|
||||
toast.warning(
|
||||
`Concurrency updated. ${result.clearedBuilds} pending build${result.clearedBuilds > 1 ? "s were" : " was"} cancelled.`,
|
||||
);
|
||||
} else {
|
||||
toast.success("Concurrency updated successfully");
|
||||
}
|
||||
setIsOpen(false);
|
||||
} catch (error) {
|
||||
toast.error("Failed to update concurrency");
|
||||
}
|
||||
};
|
||||
|
||||
const serverType = serverId ? "Remote Server" : "Dokploy Server";
|
||||
|
||||
return (
|
||||
<Dialog open={isOpen} onOpenChange={setIsOpen}>
|
||||
<DialogTrigger asChild>
|
||||
{trigger || (
|
||||
<Button variant="outline" size="sm">
|
||||
Change Concurrency
|
||||
</Button>
|
||||
)}
|
||||
</DialogTrigger>
|
||||
<DialogContent className="sm:max-w-md">
|
||||
<DialogHeader>
|
||||
<DialogTitle>Deployment Concurrency - {serverType}</DialogTitle>
|
||||
<DialogDescription>
|
||||
Configure how many deployments can run simultaneously on this
|
||||
server.
|
||||
</DialogDescription>
|
||||
</DialogHeader>
|
||||
<form onSubmit={handleSubmit} className="space-y-4">
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="concurrency">Concurrency</Label>
|
||||
<Input
|
||||
id="concurrency"
|
||||
type="number"
|
||||
min={1}
|
||||
max={20}
|
||||
value={concurrency}
|
||||
onChange={(e) => {
|
||||
const value = e.target.value;
|
||||
setConcurrency(value === "" ? "" : Number.parseInt(value, 10));
|
||||
}}
|
||||
placeholder="Enter concurrency (1-20)"
|
||||
disabled={isLoading || isLoadingCurrent}
|
||||
/>
|
||||
{isLoadingCurrent && (
|
||||
<div className="flex items-center gap-2 text-sm text-muted-foreground">
|
||||
<Loader2 className="h-4 w-4 animate-spin" />
|
||||
Loading current concurrency...
|
||||
</div>
|
||||
)}
|
||||
{!isLoadingCurrent && data && (
|
||||
<p className="text-sm text-muted-foreground">
|
||||
Current: {data.concurrency}
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
|
||||
<div className="space-y-3">
|
||||
<Alert>
|
||||
<InfoIcon className="h-4 w-4" />
|
||||
<AlertDescription className="text-sm">
|
||||
<div className="space-y-1 mt-1">
|
||||
<p>
|
||||
<strong>Default:</strong> 1 deployment at a time
|
||||
(sequential)
|
||||
</p>
|
||||
<p>
|
||||
<strong>Higher values:</strong> More deployments in
|
||||
parallel, but will use more RAM and CPU resources.
|
||||
</p>
|
||||
{serverId && (
|
||||
<p className="text-muted-foreground text-xs mt-2">
|
||||
This setting applies to deployments on this remote server.
|
||||
</p>
|
||||
)}
|
||||
{!serverId && (
|
||||
<p className="text-muted-foreground text-xs mt-2">
|
||||
This setting applies to deployments on the Dokploy server.
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
</AlertDescription>
|
||||
</Alert>
|
||||
<Alert variant="destructive">
|
||||
<InfoIcon className="h-4 w-4" />
|
||||
<AlertDescription className="text-sm font-medium">
|
||||
⚠️ <strong>Warning:</strong> Changing concurrency will cancel all
|
||||
pending builds. Currently running builds will continue, but
|
||||
queued builds will be cancelled.
|
||||
</AlertDescription>
|
||||
</Alert>
|
||||
</div>
|
||||
|
||||
<DialogFooter>
|
||||
<Button
|
||||
type="button"
|
||||
variant="outline"
|
||||
onClick={() => setIsOpen(false)}
|
||||
disabled={isLoading}
|
||||
>
|
||||
Cancel
|
||||
</Button>
|
||||
<Button type="submit" disabled={isLoading || isLoadingCurrent}>
|
||||
{isLoading ? (
|
||||
<>
|
||||
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
|
||||
Updating...
|
||||
</>
|
||||
) : (
|
||||
"Update Concurrency"
|
||||
)}
|
||||
</Button>
|
||||
</DialogFooter>
|
||||
</form>
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
);
|
||||
};
|
||||
@@ -253,12 +253,8 @@ export default async function handler(
|
||||
return true;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
`application:${jobData.applicationId}`,
|
||||
jobData,
|
||||
);
|
||||
} catch (error) {
|
||||
res.status(400).json({ message: "Error deploying Application", error });
|
||||
|
||||
@@ -183,12 +183,8 @@ export default async function handler(
|
||||
return true;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
`compose:${jobData.composeId}`,
|
||||
jobData,
|
||||
);
|
||||
} catch (error) {
|
||||
res.status(400).json({ message: "Error deploying Compose", error });
|
||||
|
||||
@@ -132,12 +132,8 @@ export default async function handler(
|
||||
continue;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
`application:${jobData.applicationId}`,
|
||||
jobData,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -170,12 +166,8 @@ export default async function handler(
|
||||
}
|
||||
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
`compose:${jobData.composeId}`,
|
||||
jobData,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -250,12 +242,8 @@ export default async function handler(
|
||||
continue;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
`application:${jobData.applicationId}`,
|
||||
jobData,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -296,12 +284,8 @@ export default async function handler(
|
||||
}
|
||||
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
`compose:${jobData.composeId}`,
|
||||
jobData,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -495,12 +479,8 @@ export default async function handler(
|
||||
continue;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
`preview:${jobData.previewDeploymentId}`,
|
||||
jobData,
|
||||
);
|
||||
}
|
||||
return res.status(200).json({ message: "Apps Deployed" });
|
||||
|
||||
@@ -58,7 +58,10 @@ import {
|
||||
applications,
|
||||
} from "@/server/db/schema";
|
||||
import type { DeploymentJob } from "@/server/queues/queue-types";
|
||||
import { cleanQueuesByApplication, myQueue } from "@/server/queues/queueSetup";
|
||||
import {
|
||||
addJobAsync,
|
||||
cleanQueuesByApplication,
|
||||
} from "@/server/queues/queueSetup";
|
||||
import { cancelDeployment, deploy } from "@/server/utils/deploy";
|
||||
import { uploadFileSchema } from "@/utils/schema";
|
||||
|
||||
@@ -335,14 +338,9 @@ export const applicationRouter = createTRPCRouter({
|
||||
await deploy(jobData);
|
||||
return true;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
// Fire and forget - UI doesn't wait for deployment to complete
|
||||
addJobAsync(`application:${jobData.applicationId}`, jobData);
|
||||
return { success: true, message: "Deployment queued" };
|
||||
}),
|
||||
saveEnvironment: protectedProcedure
|
||||
.input(apiSaveEnvironmentVariables)
|
||||
@@ -700,14 +698,8 @@ export const applicationRouter = createTRPCRouter({
|
||||
|
||||
return true;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
addJobAsync(`application:${jobData.applicationId}`, jobData);
|
||||
return { success: true, message: "Deployment queued" };
|
||||
}),
|
||||
|
||||
cleanQueues: protectedProcedure
|
||||
@@ -798,14 +790,8 @@ export const applicationRouter = createTRPCRouter({
|
||||
return true;
|
||||
}
|
||||
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
// Fire and forget - UI doesn't wait for deployment to complete
|
||||
addJobAsync(`application:${jobData.applicationId}`, jobData);
|
||||
return true;
|
||||
}),
|
||||
updateTraefikConfig: protectedProcedure
|
||||
|
||||
@@ -59,7 +59,7 @@ import {
|
||||
compose as composeTable,
|
||||
} from "@/server/db/schema";
|
||||
import type { DeploymentJob } from "@/server/queues/queue-types";
|
||||
import { cleanQueuesByCompose, myQueue } from "@/server/queues/queueSetup";
|
||||
import { addJobAsync, cleanQueuesByCompose } from "@/server/queues/queueSetup";
|
||||
import { cancelDeployment, deploy } from "@/server/utils/deploy";
|
||||
import { generatePassword } from "@/templates/utils";
|
||||
import { createTRPCRouter, protectedProcedure, publicProcedure } from "../trpc";
|
||||
@@ -401,14 +401,7 @@ export const composeRouter = createTRPCRouter({
|
||||
await deploy(jobData);
|
||||
return true;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
addJobAsync(`compose:${jobData.composeId}`, jobData);
|
||||
return { success: true, message: "Deployment queued" };
|
||||
}),
|
||||
redeploy: protectedProcedure
|
||||
@@ -437,14 +430,7 @@ export const composeRouter = createTRPCRouter({
|
||||
await deploy(jobData);
|
||||
return true;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
addJobAsync(`compose:${jobData.composeId}`, jobData);
|
||||
return { success: true, message: "Redeployment queued" };
|
||||
}),
|
||||
stop: protectedProcedure
|
||||
|
||||
@@ -862,4 +862,49 @@ export const settingsRouter = createTRPCRouter({
|
||||
const ips = process.env.DOKPLOY_CLOUD_IPS?.split(",");
|
||||
return ips;
|
||||
}),
|
||||
getDeploymentConcurrency: adminProcedure
|
||||
.input(
|
||||
z.object({
|
||||
serverId: z.string().optional(),
|
||||
}),
|
||||
)
|
||||
.query(async ({ input }) => {
|
||||
// For now, remote servers use the same queue as dokploy server
|
||||
// In the future, we could implement per-server queues
|
||||
const { getConcurrency } = await import("@/server/queues/queueSetup");
|
||||
return {
|
||||
concurrency: getConcurrency(),
|
||||
serverId: input.serverId,
|
||||
};
|
||||
}),
|
||||
setDeploymentConcurrency: adminProcedure
|
||||
.input(
|
||||
z.object({
|
||||
concurrency: z.number().int().min(1).max(20),
|
||||
serverId: z.string().optional(),
|
||||
}),
|
||||
)
|
||||
.mutation(async ({ input }) => {
|
||||
// For now, remote servers use the same queue as dokploy server
|
||||
// In the future, we could implement per-server queues
|
||||
const { setConcurrency, getConcurrency } = await import(
|
||||
"@/server/queues/queueSetup"
|
||||
);
|
||||
const currentConcurrency = getConcurrency();
|
||||
const clearedCount = setConcurrency(input.concurrency);
|
||||
const serverType = input.serverId ? "remote server" : "Dokploy server";
|
||||
|
||||
let message = `${serverType} deployment concurrency updated from ${currentConcurrency} to ${input.concurrency}. Changes take effect immediately.`;
|
||||
if (clearedCount > 0) {
|
||||
message += ` ${clearedCount} pending build${clearedCount > 1 ? "s were" : " was"} cancelled due to concurrency change.`;
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message,
|
||||
concurrency: input.concurrency,
|
||||
serverId: input.serverId,
|
||||
clearedBuilds: clearedCount,
|
||||
};
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -8,67 +8,77 @@ import {
|
||||
updateCompose,
|
||||
updatePreviewDeployment,
|
||||
} from "@dokploy/server";
|
||||
import { type Job, Worker } from "bullmq";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
import { redisConfig } from "./redis-connection";
|
||||
import { myQueue } from "./queueSetup";
|
||||
|
||||
export const deploymentWorker = new Worker(
|
||||
"deployments",
|
||||
async (job: Job<DeploymentJob>) => {
|
||||
try {
|
||||
if (job.data.applicationType === "application") {
|
||||
await updateApplicationStatus(job.data.applicationId, "running");
|
||||
// Set the handler for processing deployment jobs
|
||||
console.log("Setting deployment queue handler");
|
||||
myQueue.setHandler(async (job: DeploymentJob) => {
|
||||
const jobId =
|
||||
job.applicationType === "application"
|
||||
? job.applicationId
|
||||
: job.applicationType === "compose"
|
||||
? job.composeId
|
||||
: job.previewDeploymentId;
|
||||
console.log("Handler called with job:", job.applicationType, jobId);
|
||||
try {
|
||||
if (job.applicationType === "application") {
|
||||
await updateApplicationStatus(job.applicationId, "running");
|
||||
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else if (job.data.applicationType === "compose") {
|
||||
await updateCompose(job.data.composeId, {
|
||||
composeStatus: "running",
|
||||
if (job.type === "redeploy") {
|
||||
await rebuildApplication({
|
||||
applicationId: job.applicationId,
|
||||
titleLog: job.titleLog,
|
||||
descriptionLog: job.descriptionLog,
|
||||
});
|
||||
if (job.data.type === "deploy") {
|
||||
await deployCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "redeploy") {
|
||||
await rebuildCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else if (job.data.applicationType === "application-preview") {
|
||||
await updatePreviewDeployment(job.data.previewDeploymentId, {
|
||||
previewStatus: "running",
|
||||
} else if (job.type === "deploy") {
|
||||
await deployApplication({
|
||||
applicationId: job.applicationId,
|
||||
titleLog: job.titleLog,
|
||||
descriptionLog: job.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else if (job.applicationType === "compose") {
|
||||
await updateCompose(job.composeId, {
|
||||
composeStatus: "running",
|
||||
});
|
||||
if (job.type === "deploy") {
|
||||
await deployCompose({
|
||||
composeId: job.composeId,
|
||||
titleLog: job.titleLog,
|
||||
descriptionLog: job.descriptionLog,
|
||||
});
|
||||
} else if (job.type === "redeploy") {
|
||||
await rebuildCompose({
|
||||
composeId: job.composeId,
|
||||
titleLog: job.titleLog,
|
||||
descriptionLog: job.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else if (job.applicationType === "application-preview") {
|
||||
await updatePreviewDeployment(job.previewDeploymentId, {
|
||||
previewStatus: "running",
|
||||
});
|
||||
|
||||
if (job.type === "deploy") {
|
||||
await deployPreviewApplication({
|
||||
applicationId: job.applicationId,
|
||||
titleLog: job.titleLog,
|
||||
descriptionLog: job.descriptionLog,
|
||||
previewDeploymentId: job.previewDeploymentId,
|
||||
});
|
||||
|
||||
if (job.data.type === "deploy") {
|
||||
await deployPreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.log("Error", error);
|
||||
}
|
||||
} catch (error) {
|
||||
console.log("Error processing deployment job", error);
|
||||
throw error; // Re-throw to let the queue handle retries if needed
|
||||
}
|
||||
});
|
||||
|
||||
// Export for compatibility (no longer needed but kept for imports)
|
||||
export const deploymentWorker = {
|
||||
run: () => {
|
||||
// Queue starts processing automatically when jobs are added
|
||||
console.log("Deployment queue handler initialized");
|
||||
},
|
||||
{
|
||||
autorun: false,
|
||||
connection: redisConfig,
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
@@ -0,0 +1,256 @@
|
||||
/**
|
||||
* In-memory grouped queue implementation
|
||||
* Each group processes one job at a time (FIFO per group)
|
||||
* Multiple groups can process in parallel
|
||||
*/
|
||||
|
||||
type Task<T> = {
|
||||
data: T;
|
||||
resolve: () => void;
|
||||
reject: (error: Error) => void;
|
||||
};
|
||||
|
||||
type GroupQueue<T> = {
|
||||
tasks: Task<T>[];
|
||||
processing: boolean;
|
||||
};
|
||||
|
||||
export class GroupedQueue<T> {
|
||||
private groups: Map<string, GroupQueue<T>> = new Map();
|
||||
private handler?: (data: T) => Promise<void>;
|
||||
private concurrency: number;
|
||||
private activeGroups: Set<string> = new Set();
|
||||
|
||||
constructor(concurrency = 4) {
|
||||
this.concurrency = concurrency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the handler function that processes each job
|
||||
*/
|
||||
setHandler(handler: (data: T) => Promise<void>) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a job to a group queue
|
||||
*/
|
||||
async add(groupId: string, data: T): Promise<void> {
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
console.log(
|
||||
`Adding job to group ${groupId}, handler set: ${!!this.handler}`,
|
||||
);
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!this.groups.has(groupId)) {
|
||||
this.groups.set(groupId, {
|
||||
tasks: [],
|
||||
processing: false,
|
||||
});
|
||||
}
|
||||
|
||||
const group = this.groups.get(groupId)!;
|
||||
group.tasks.push({
|
||||
data,
|
||||
resolve,
|
||||
reject,
|
||||
});
|
||||
|
||||
// Start processing if not already processing and under concurrency limit
|
||||
if (!group.processing && this.activeGroups.size < this.concurrency) {
|
||||
this.processGroup(groupId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Process jobs in a group queue
|
||||
*/
|
||||
private async processGroup(groupId: string): Promise<void> {
|
||||
const group = this.groups.get(groupId);
|
||||
if (!group || group.processing) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for handler to be set if not available
|
||||
if (!this.handler) {
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
console.log(`Handler not set yet for group ${groupId}, waiting...`);
|
||||
}
|
||||
// Retry after a short delay
|
||||
setTimeout(() => {
|
||||
if (this.handler && group.tasks.length > 0) {
|
||||
this.processGroup(groupId);
|
||||
}
|
||||
}, 100);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check concurrency limit
|
||||
if (this.activeGroups.size >= this.concurrency) {
|
||||
return;
|
||||
}
|
||||
|
||||
group.processing = true;
|
||||
this.activeGroups.add(groupId);
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
console.log(`Processing group ${groupId}, tasks: ${group.tasks.length}`);
|
||||
}
|
||||
|
||||
while (group.tasks.length > 0) {
|
||||
const task = group.tasks.shift()!;
|
||||
|
||||
try {
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
console.log(`Executing handler for group ${groupId}`);
|
||||
}
|
||||
await this.handler!(task.data);
|
||||
task.resolve();
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
console.log(`Handler completed for group ${groupId}`);
|
||||
}
|
||||
} catch (error) {
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
console.error(`Handler error for group ${groupId}:`, error);
|
||||
}
|
||||
task.reject(error instanceof Error ? error : new Error(String(error)));
|
||||
}
|
||||
}
|
||||
|
||||
group.processing = false;
|
||||
this.activeGroups.delete(groupId);
|
||||
|
||||
// Try to process another group if there are waiting groups
|
||||
this.processNextGroup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the next available group
|
||||
*/
|
||||
private processNextGroup(): void {
|
||||
if (this.activeGroups.size >= this.concurrency) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Find a group with pending tasks that's not currently processing
|
||||
for (const [groupId, group] of this.groups.entries()) {
|
||||
if (
|
||||
!group.processing &&
|
||||
group.tasks.length > 0 &&
|
||||
!this.activeGroups.has(groupId)
|
||||
) {
|
||||
this.processGroup(groupId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all tasks for a specific group
|
||||
*/
|
||||
clearGroup(groupId: string): void {
|
||||
const group = this.groups.get(groupId);
|
||||
if (group) {
|
||||
// Reject all pending tasks
|
||||
for (const task of group.tasks) {
|
||||
task.reject(new Error("Queue cleared"));
|
||||
}
|
||||
group.tasks = [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all pending tasks across all groups
|
||||
* This is useful when changing concurrency settings
|
||||
* Note: This only clears tasks in the queue, not the currently executing task
|
||||
*/
|
||||
clearAllPendingTasks(): number {
|
||||
let clearedCount = 0;
|
||||
for (const [groupId, group] of this.groups.entries()) {
|
||||
// Clear all pending tasks in the queue
|
||||
// The currently executing task is not in group.tasks (it was already shifted)
|
||||
if (group.tasks.length > 0) {
|
||||
clearedCount += group.tasks.length;
|
||||
for (const task of group.tasks) {
|
||||
task.reject(new Error("Concurrency changed - queue cleared"));
|
||||
}
|
||||
group.tasks = [];
|
||||
}
|
||||
}
|
||||
return clearedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending tasks for a group
|
||||
*/
|
||||
getGroupLength(groupId: string): number {
|
||||
return this.groups.get(groupId)?.tasks.length ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total number of pending tasks across all groups
|
||||
*/
|
||||
getTotalLength(): number {
|
||||
let total = 0;
|
||||
for (const group of this.groups.values()) {
|
||||
total += group.tasks.length;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if queue is idle (no active processing)
|
||||
*/
|
||||
isIdle(): boolean {
|
||||
return this.activeGroups.size === 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of active groups (for testing)
|
||||
*/
|
||||
getActiveGroupsCount(): number {
|
||||
return this.activeGroups.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the concurrency limit
|
||||
*/
|
||||
getConcurrency(): number {
|
||||
return this.concurrency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the concurrency limit dynamically
|
||||
* This allows changing concurrency without recreating the queue
|
||||
* WARNING: This will clear all pending tasks when concurrency changes
|
||||
*/
|
||||
setConcurrency(concurrency: number): void {
|
||||
if (concurrency < 1) {
|
||||
throw new Error("Concurrency must be at least 1");
|
||||
}
|
||||
const concurrencyChanged = this.concurrency !== concurrency;
|
||||
this.concurrency = concurrency;
|
||||
|
||||
// If concurrency changed, clear all pending tasks
|
||||
if (concurrencyChanged) {
|
||||
this.clearAllPendingTasks();
|
||||
}
|
||||
|
||||
// Process next group if we now have capacity
|
||||
this.processNextGroup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the queue and reject all pending tasks
|
||||
*/
|
||||
async close(): Promise<void> {
|
||||
for (const [groupId, group] of this.groups.entries()) {
|
||||
for (const task of group.tasks) {
|
||||
task.reject(new Error("Queue closed"));
|
||||
}
|
||||
group.tasks = [];
|
||||
}
|
||||
this.groups.clear();
|
||||
this.activeGroups.clear();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
/**
|
||||
* Queue Manager - Manages multiple dynamic queues
|
||||
* Each queue can have its own concurrency configuration
|
||||
*/
|
||||
|
||||
import { GroupedQueue } from "./grouped-queue-wrapper";
|
||||
|
||||
export class QueueManager {
|
||||
private queues: Map<string, GroupedQueue<any>> = new Map();
|
||||
|
||||
/**
|
||||
* Get or create a queue with the specified name and concurrency
|
||||
* Note: If queue already exists, concurrency parameter is ignored
|
||||
*/
|
||||
getQueue<T>(name: string, concurrency = 1): GroupedQueue<T> {
|
||||
if (!this.queues.has(name)) {
|
||||
this.queues.set(name, new GroupedQueue<T>(concurrency));
|
||||
}
|
||||
return this.queues.get(name) as GroupedQueue<T>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set handler for a specific queue
|
||||
*/
|
||||
setHandler<T>(queueName: string, handler: (data: T) => Promise<void>): void {
|
||||
const queue = this.getQueue<T>(queueName);
|
||||
queue.setHandler(handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a job to a specific queue and group
|
||||
* If concurrency is provided and queue doesn't exist, creates it with that concurrency
|
||||
*/
|
||||
async add<T>(
|
||||
queueName: string,
|
||||
groupId: string,
|
||||
data: T,
|
||||
concurrency?: number,
|
||||
): Promise<void> {
|
||||
// If concurrency is provided and queue doesn't exist, create with that concurrency
|
||||
if (concurrency !== undefined && !this.queues.has(queueName)) {
|
||||
this.queues.set(queueName, new GroupedQueue<T>(concurrency));
|
||||
}
|
||||
const queue = this.getQueue<T>(queueName);
|
||||
return queue.add(groupId, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all tasks for a specific group in a queue
|
||||
*/
|
||||
clearGroup(queueName: string, groupId: string): void {
|
||||
const queue = this.queues.get(queueName);
|
||||
if (queue) {
|
||||
queue.clearGroup(groupId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending tasks for a group in a queue
|
||||
*/
|
||||
getGroupLength(queueName: string, groupId: string): number {
|
||||
const queue = this.queues.get(queueName);
|
||||
return queue ? queue.getGroupLength(groupId) : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total number of pending tasks across all groups in a queue
|
||||
*/
|
||||
getTotalLength(queueName: string): number {
|
||||
const queue = this.queues.get(queueName);
|
||||
return queue ? queue.getTotalLength() : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a queue is idle
|
||||
*/
|
||||
isIdle(queueName: string): boolean {
|
||||
const queue = this.queues.get(queueName);
|
||||
return queue ? queue.isIdle() : true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a specific queue
|
||||
*/
|
||||
async closeQueue(queueName: string): Promise<void> {
|
||||
const queue = this.queues.get(queueName);
|
||||
if (queue) {
|
||||
await queue.close();
|
||||
this.queues.delete(queueName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all queues
|
||||
*/
|
||||
async closeAll(): Promise<void> {
|
||||
const promises = Array.from(this.queues.keys()).map((name) =>
|
||||
this.closeQueue(name),
|
||||
);
|
||||
await Promise.all(promises);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all queue names
|
||||
*/
|
||||
getQueueNames(): string[] {
|
||||
return Array.from(this.queues.keys());
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton instance
|
||||
export const queueManager = new QueueManager();
|
||||
@@ -1,44 +1,110 @@
|
||||
import { Queue } from "bullmq";
|
||||
import { redisConfig } from "./redis-connection";
|
||||
import { GroupedQueue } from "./grouped-queue-wrapper";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
|
||||
const myQueue = new Queue("deployments", {
|
||||
connection: redisConfig,
|
||||
});
|
||||
// In-memory grouped queue: processes one job per group at a time
|
||||
// Multiple groups can process in parallel (up to concurrency limit)
|
||||
// Concurrency can be configured via DEPLOYMENT_QUEUE_CONCURRENCY env var (default: 1)
|
||||
// or dynamically via setConcurrency() function
|
||||
let DEPLOYMENT_CONCURRENCY = Number.parseInt(
|
||||
process.env.DEPLOYMENT_QUEUE_CONCURRENCY || "1",
|
||||
10,
|
||||
);
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
myQueue.close();
|
||||
// Validate concurrency is at least 1
|
||||
if (DEPLOYMENT_CONCURRENCY < 1) {
|
||||
DEPLOYMENT_CONCURRENCY = 1;
|
||||
}
|
||||
|
||||
const myQueue = new GroupedQueue<DeploymentJob>(DEPLOYMENT_CONCURRENCY);
|
||||
|
||||
// Initialize handler when this module is imported
|
||||
// Use dynamic import to avoid circular dependency
|
||||
// The handler will be set when deployments-queue.ts is imported
|
||||
let handlerInitialized = false;
|
||||
const initializeHandler = async () => {
|
||||
if (!handlerInitialized) {
|
||||
handlerInitialized = true;
|
||||
// This will set the handler
|
||||
await import("./deployments-queue");
|
||||
}
|
||||
};
|
||||
|
||||
// Initialize handler immediately (non-blocking)
|
||||
void initializeHandler();
|
||||
|
||||
process.on("SIGTERM", async () => {
|
||||
await myQueue.close();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
myQueue.on("error", (error) => {
|
||||
if ((error as any).code === "ECONNREFUSED") {
|
||||
console.error(
|
||||
"Make sure you have installed Redis and it is running.",
|
||||
error,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
export const cleanQueuesByApplication = async (applicationId: string) => {
|
||||
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job?.data?.applicationId === applicationId) {
|
||||
await job.remove();
|
||||
console.log(`Removed job ${job.id} for application ${applicationId}`);
|
||||
}
|
||||
}
|
||||
const groupId = `application:${applicationId}`;
|
||||
myQueue.clearGroup(groupId);
|
||||
console.log(`Cleared queue for application ${applicationId}`);
|
||||
};
|
||||
|
||||
export const cleanQueuesByCompose = async (composeId: string) => {
|
||||
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
|
||||
const groupId = `compose:${composeId}`;
|
||||
myQueue.clearGroup(groupId);
|
||||
console.log(`Cleared queue for compose ${composeId}`);
|
||||
};
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job?.data?.composeId === composeId) {
|
||||
await job.remove();
|
||||
console.log(`Removed job ${job.id} for compose ${composeId}`);
|
||||
/**
|
||||
* Add a job to the queue without awaiting (fire-and-forget)
|
||||
* This allows the API to return immediately while the job processes in the background
|
||||
* Errors are logged but don't block the response
|
||||
*/
|
||||
export const addJobAsync = (groupId: string, data: DeploymentJob): void => {
|
||||
// Fire and forget - don't await, but handle errors
|
||||
myQueue.add(groupId, data).catch((error) => {
|
||||
console.error(`Failed to queue job for group ${groupId}:`, error);
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* Get the current deployment queue concurrency
|
||||
*/
|
||||
export const getConcurrency = (): number => {
|
||||
return myQueue.getConcurrency();
|
||||
};
|
||||
|
||||
/**
|
||||
* Set the deployment queue concurrency dynamically
|
||||
* This updates the queue's concurrency setting immediately
|
||||
* WARNING: This will clear all pending builds when concurrency changes
|
||||
* @returns The number of pending builds that were cleared
|
||||
*/
|
||||
export const setConcurrency = (concurrency: number): number => {
|
||||
if (concurrency < 1) {
|
||||
throw new Error("Concurrency must be at least 1");
|
||||
}
|
||||
|
||||
const currentConcurrency = myQueue.getConcurrency();
|
||||
const concurrencyChanged = currentConcurrency !== concurrency;
|
||||
|
||||
// Get count of pending tasks before clearing (setConcurrency will clear them)
|
||||
let clearedCount = 0;
|
||||
if (concurrencyChanged) {
|
||||
// Get the count before setConcurrency clears them
|
||||
clearedCount = myQueue.getTotalLength();
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
console.log(
|
||||
`Concurrency changing from ${currentConcurrency} to ${concurrency}. Will clear ${clearedCount} pending builds.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Update the stored concurrency value
|
||||
DEPLOYMENT_CONCURRENCY = concurrency;
|
||||
|
||||
// Update the queue's concurrency dynamically (this will clear pending tasks)
|
||||
myQueue.setConcurrency(concurrency);
|
||||
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
console.log(`Deployment queue concurrency updated to ${concurrency}`);
|
||||
}
|
||||
|
||||
return clearedCount;
|
||||
};
|
||||
|
||||
export { myQueue };
|
||||
|
||||
@@ -4,11 +4,11 @@ import {
|
||||
createDefaultServerTraefikConfig,
|
||||
createDefaultTraefikConfig,
|
||||
IS_CLOUD,
|
||||
initCancelDeployments,
|
||||
initCronJobs,
|
||||
initializeNetwork,
|
||||
initSchedules,
|
||||
initVolumeBackupsCronJobs,
|
||||
initCancelDeployments,
|
||||
sendDokployRestartNotifications,
|
||||
setupDirectories,
|
||||
} from "@dokploy/server";
|
||||
@@ -66,6 +66,8 @@ void app.prepare().then(async () => {
|
||||
console.log(`Server Started on: http://${HOST}:${PORT}`);
|
||||
if (!IS_CLOUD) {
|
||||
console.log("Starting Deployment Worker");
|
||||
// Import the handler module to ensure it's initialized
|
||||
await import("./queues/deployments-queue");
|
||||
const { deploymentWorker } = await import("./queues/deployments-queue");
|
||||
await deploymentWorker.run();
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ docker ${buildArgs.join(" ")} || {
|
||||
exit 1;
|
||||
}
|
||||
echo "✅ Railpack build completed." ;
|
||||
docker buildx rm builder-containerd
|
||||
docker buildx rm builder-containerd || true
|
||||
`;
|
||||
|
||||
return bashCommand;
|
||||
|
||||
Reference in New Issue
Block a user