Files
dokploy/apps/api/src/index.ts
T
Mauricio Siu 08c9113405 feat: implement deployment jobs API and enhance queue management
Added a new endpoint to fetch deployment jobs for a server, integrating with the Inngest API to retrieve job details. Updated the queue management system to support centralized job retrieval for cloud environments, improving the deployment monitoring experience. Enhanced the UI to include action buttons for job cancellation and improved error handling for job fetching.
2026-03-03 01:04:26 -06:00

215 lines
4.8 KiB
TypeScript

import { serve } from "@hono/node-server";
import { Hono } from "hono";
import "dotenv/config";
import { zValidator } from "@hono/zod-validator";
import { Inngest } from "inngest";
import { serve as serveInngest } from "inngest/hono";
import { logger } from "./logger.js";
import {
cancelDeploymentSchema,
type DeployJob,
deployJobSchema,
} from "./schema.js";
import { fetchDeploymentJobs } from "./service.js";
import { deploy } from "./utils.js";
const app = new Hono();
// Initialize Inngest client
export const inngest = new Inngest({
id: "dokploy-deployments",
name: "Dokploy Deployment Service",
});
export const deploymentFunction = inngest.createFunction(
{
id: "deploy-application",
name: "Deploy Application",
concurrency: [
{
key: "event.data.serverId",
limit: 1,
},
],
retries: 0,
cancelOn: [
{
event: "deployment/cancelled",
if: "async.data.applicationId == event.data.applicationId || async.data.composeId == event.data.composeId",
timeout: "1h", // Allow cancellation for up to 1 hour
},
],
},
{ event: "deployment/requested" },
async ({ event, step }) => {
const jobData = event.data as DeployJob;
return await step.run("execute-deployment", async () => {
logger.info("Deploying started");
try {
const result = await deploy(jobData);
logger.info("Deployment finished", result);
// Send success event
await inngest.send({
name: "deployment/completed",
data: {
...jobData,
result,
status: "success",
},
});
return result;
} catch (error) {
logger.error("Deployment failed", { jobData, error });
// Send failure event
await inngest.send({
name: "deployment/failed",
data: {
...jobData,
error: error instanceof Error ? error.message : String(error),
status: "failed",
},
});
throw error;
}
});
},
);
app.use(async (c, next) => {
if (c.req.path === "/health" || c.req.path === "/api/inngest") {
return next();
}
const authHeader = c.req.header("X-API-Key");
if (process.env.API_KEY !== authHeader) {
return c.json({ message: "Invalid API Key" }, 403);
}
return next();
});
app.post("/deploy", zValidator("json", deployJobSchema), async (c) => {
const data = c.req.valid("json");
logger.info("Received deployment request", data);
try {
// Send event to Inngest instead of adding to Redis queue
await inngest.send({
name: "deployment/requested",
data,
});
logger.info("Deployment event sent to Inngest", {
serverId: data.serverId,
});
return c.json(
{
message: "Deployment Added to Inngest Queue",
serverId: data.serverId,
},
200,
);
} catch (error) {
logger.error("Failed to send deployment event", error);
return c.json(
{
message: "Failed to queue deployment",
error: error instanceof Error ? error.message : String(error),
},
500,
);
}
});
app.post(
"/cancel-deployment",
zValidator("json", cancelDeploymentSchema),
async (c) => {
const data = c.req.valid("json");
logger.info("Received cancel deployment request", data);
try {
// Send cancellation event to Inngest
await inngest.send({
name: "deployment/cancelled",
data,
});
const identifier =
data.applicationType === "application"
? `applicationId: ${data.applicationId}`
: `composeId: ${data.composeId}`;
logger.info("Deployment cancellation event sent", {
...data,
identifier,
});
return c.json({
message: "Deployment cancellation requested",
applicationType: data.applicationType,
});
} catch (error) {
logger.error("Failed to send deployment cancellation event", error);
return c.json(
{
message: "Failed to cancel deployment",
error: error instanceof Error ? error.message : String(error),
},
500,
);
}
},
);
app.get("/health", async (c) => {
return c.json({ status: "ok" });
});
// List deployment jobs (Inngest runs) for a server - same shape as BullMQ queue for the UI
app.get("/jobs", async (c) => {
const serverId = c.req.query("serverId");
if (!serverId) {
return c.json({ message: "serverId is required" }, 400);
}
try {
const rows = await fetchDeploymentJobs(serverId);
return c.json(rows);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (message.includes("INNGEST_BASE_URL")) {
return c.json(
{ message: "INNGEST_BASE_URL is required to list deployment jobs" },
503,
);
}
logger.error("Failed to fetch jobs from Inngest", { serverId, error });
return c.json([], 200);
}
});
// Serve Inngest functions endpoint
app.on(
["GET", "POST", "PUT"],
"/api/inngest",
serveInngest({
client: inngest,
functions: [deploymentFunction],
}),
);
const port = Number.parseInt(process.env.PORT || "3000");
logger.info("Starting Deployments Server with Inngest ✅", port);
serve({ fetch: app.fetch, port });