mirror of
https://github.com/go-gitea/gitea.git
synced 2026-06-14 03:29:55 +00:00
feat(actions)!: improve support for reusable workflows (#37478)
## Summary This PR improves reusable workflow support for Gitea Actions. The parsing of the called workflow now happens on Gitea side, not on the runner. When the caller becomes ready, Gitea fetches the called workflow source, parses it, and inserts each child job into the database as a `ActionRunJob` linked to the caller via `ParentCallJobID`. As a result, every callee job is dispatched as its own task and its logs surface as an independent job entry in the UI, rather than being inlined into the caller's "Set up job" step. This PR supports two kinds of `uses` : - same-repo call: `uses: ./.gitea/workflows/foo.yaml` - cross-repo call: `uses: OWNER/REPO/.gitea/workflows/foo.yaml@REF` ## **⚠️ BREAKING ⚠️** External reusable workflows (`uses: https://other-gitea-instance/OWNER/REPO/.gitea/workflows/test.yaml@REF`) are no longer supported. To keep using them, clone the repositories to the local instance. ## Main changes ### Execution model - Each caller job carries `IsReusableCaller=true` and won't be fetched by runners. - `ParentCallJobID` can link a called job to its caller. - Caller status is derived from its direct children. ### Workflow syntax - `jobparser` now supports parsing `on: workflow_call` trigger with `inputs:`, `outputs:`, and `secrets:` declarations. - **Max nesting depth**: capped at `MaxReusableCallLevels = 9`, which means a top-level caller may have at most 9 nested callers below it. - **Cycle prevention**: at expansion time, `checkCallerChain` walks the caller's ancestor chain via `ParentCallJobID` and rejects if the same `uses:` string appears anywhere upstream (`reusable workflow call cycle detected`). This catches both direct (`A -> A`) and indirect (`A -> B -> A`) cycles. ### Cross-repo access - To share reusable workflows from private repos, use `Collaborative Owners` introduced by #32562 ### Rerun semantics - `expandRerunJobIDs` partitions the latest attempt's jobs into: - a **rerun set**: jobs being rerun + downstream siblings within the same scope. - an **ancestor set**: reusable callers whose only *some* descendants are being rerun (the caller itself is not). - Cloning behavior for callers in `execRerunPlan`: - **Caller is fully rerun** (caller's `AttemptJobID` in `rerunSet`): none of its descendants are cloned. The caller is cloned with `IsCallerExpanded=false`, and re-expansion (which reinserts the children fresh) happens later when the resolver brings the caller to `Waiting` again. - **Caller is in ancestor set** (only some descendants rerun): the caller is pass-through (`Status` will be updated by its fresh children). Its non-rerun descendants are also pass-through clones (point `SourceTaskID` at the original task). Their `ParentCallJobID` is remapped to the new attempt's caller row. ### UI - Job list in `RepoActionView.vue` is now tree-shaped: callers indent their children. Callers default to collapsed. - New caller detail page using `WorkflowGraph` to show direct children only; the run summary's `WorkflowGraph` shows top-level callers and their immediate descendants. ### Known trade-offs - **Caller expansion runs inside the enclosing write transaction.** `expandReusableWorkflowCaller` performs a git read of the called workflow while holding the row locks that update the caller and insert its children. This is intentional: the caller-row update and child-row inserts must commit atomically. None of the call sites is hot (each caller is expanded once per attempt), so the trade-off is acceptable. - **A malformed `if:` expression on a job leaves it `Blocked` silently.** `evaluateJobIf` now runs server-side as part of resolver passes; deterministic expression errors (typos, undefined context fields) are logged but do not surface in the UI. This is the same behavior the resolver already had for concurrency-expression errors. Distinguishing transient DB errors from user-authored expression errors and writing the latter back as `StatusFailure` is a follow-up. #### Screenshots <img width="1600" alt="image" src="https://github.com/user-attachments/assets/bfaa9b7a-07e9-4127-8de9-a81f86e82828" /> <img width="1600" alt="image" src="https://github.com/user-attachments/assets/8af109b3-ef28-4b53-aaad-d4632b923224" /> ## References - https://docs.github.com/en/actions/how-tos/reuse-automations/reuse-workflows - https://docs.github.com/en/actions/reference/workflows-and-actions/reusing-workflow-configurations --- Replace #36388 --------- Signed-off-by: Zettat123 <zettat123@gmail.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: silverwind <me@silverwind.io> Co-authored-by: Claude (Opus 4.7) <noreply@anthropic.com>
This commit is contained in:
@@ -5,16 +5,22 @@ package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/models/db"
|
||||
repo_model "gitea.dev/models/repo"
|
||||
user_model "gitea.dev/models/user"
|
||||
"gitea.dev/modules/container"
|
||||
"gitea.dev/modules/log"
|
||||
)
|
||||
|
||||
func ApproveRuns(ctx context.Context, repo *repo_model.Repository, doer *user_model.User, runIDs []int64) error {
|
||||
updatedJobs := make([]*actions_model.ActionRunJob, 0)
|
||||
cancelledConcurrencyJobs := make([]*actions_model.ActionRunJob, 0)
|
||||
// Track runs whose reusable callers were just expanded so we can re-emit after the tx commits.
|
||||
expandedCallerRunIDs := make(container.Set[int64])
|
||||
|
||||
err := db.WithTx(ctx, func(ctx context.Context) (err error) {
|
||||
for _, runID := range runIDs {
|
||||
@@ -31,6 +37,7 @@ func ApproveRuns(ctx context.Context, repo *repo_model.Repository, doer *user_mo
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
// Skip jobs with `needs`: they stay blocked until their dependencies finish,
|
||||
// at which point job_emitter will evaluate and start them.
|
||||
@@ -43,14 +50,38 @@ func ApproveRuns(ctx context.Context, repo *repo_model.Repository, doer *user_mo
|
||||
return err
|
||||
}
|
||||
cancelledConcurrencyJobs = append(cancelledConcurrencyJobs, jobsToCancel...)
|
||||
if job.Status == actions_model.StatusWaiting {
|
||||
n, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
|
||||
if job.Status != actions_model.StatusWaiting {
|
||||
continue
|
||||
}
|
||||
n, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n == 0 {
|
||||
continue
|
||||
}
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
|
||||
// A top-level reusable caller was just unblocked by approval, expand it
|
||||
if job.IsReusableCaller && !job.IsExpanded {
|
||||
attempt, has, err := run.GetLatestAttempt(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get latest attempt of run %d: %w", run.ID, err)
|
||||
}
|
||||
if !has {
|
||||
return errors.New("run has no attempt")
|
||||
}
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n > 0 {
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
if err := expandReusableWorkflowCaller(ctx, run, attempt, job, vars); err != nil {
|
||||
return fmt.Errorf("expand caller %d on approval: %w", job.ID, err)
|
||||
}
|
||||
if err := actions_model.RefreshReusableCallerStatus(ctx, job); err != nil {
|
||||
return fmt.Errorf("refresh caller %d status after approval-time expansion: %w", job.ID, err)
|
||||
}
|
||||
expandedCallerRunIDs.Add(run.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,6 +91,13 @@ func ApproveRuns(ctx context.Context, repo *repo_model.Repository, doer *user_mo
|
||||
return err
|
||||
}
|
||||
|
||||
// Re-emit AFTER the tx commits so the newly inserted callee rows transition Blocked -> Waiting.
|
||||
for runID := range expandedCallerRunIDs {
|
||||
if err := EmitJobsIfReadyByRun(runID); err != nil {
|
||||
log.Error("emit run %d after approval-time caller expansion: %v", runID, err)
|
||||
}
|
||||
}
|
||||
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, updatedJobs)
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, cancelledConcurrencyJobs)
|
||||
|
||||
|
||||
@@ -9,8 +9,6 @@ import (
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/modules/actions/jobparser"
|
||||
"gitea.dev/modules/json"
|
||||
api "gitea.dev/modules/structs"
|
||||
|
||||
act_model "gitea.com/gitea/runner/act/model"
|
||||
"go.yaml.in/yaml/v4"
|
||||
@@ -29,7 +27,7 @@ func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.Act
|
||||
jobResults := map[string]*jobparser.JobResult{"": {}}
|
||||
if inputs == nil {
|
||||
var err error
|
||||
inputs, err = getInputsFromRun(run)
|
||||
inputs, err = getWorkflowDispatchInputsFromRun(run)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get inputs: %w", err)
|
||||
}
|
||||
@@ -43,25 +41,6 @@ func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.Act
|
||||
return nil
|
||||
}
|
||||
|
||||
func findJobNeedsAndFillJobResults(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*jobparser.JobResult, error) {
|
||||
taskNeeds, err := FindTaskNeeds(ctx, job)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("find task needs: %w", err)
|
||||
}
|
||||
jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
|
||||
for jobID, taskNeed := range taskNeeds {
|
||||
jobResult := &jobparser.JobResult{
|
||||
Result: taskNeed.Result.String(),
|
||||
Outputs: taskNeed.Outputs,
|
||||
}
|
||||
jobResults[jobID] = jobResult
|
||||
}
|
||||
jobResults[job.JobID] = &jobparser.JobResult{
|
||||
Needs: job.Needs,
|
||||
}
|
||||
return jobResults, nil
|
||||
}
|
||||
|
||||
// EvaluateJobConcurrencyFillModel evaluates the expressions in a job-level concurrency,
|
||||
// and fills the job's model fields with `concurrency.group` and `concurrency.cancel-in-progress`.
|
||||
// Job-level concurrency may depend on other job's outputs (via `needs`): `concurrency.group: my-group-${{ needs.job1.outputs.out1 }}`
|
||||
@@ -86,7 +65,7 @@ func EvaluateJobConcurrencyFillModel(ctx context.Context, run *actions_model.Act
|
||||
|
||||
if inputs == nil {
|
||||
var err error
|
||||
inputs, err = getInputsFromRun(run)
|
||||
inputs, err = getInputsForJob(ctx, run, actionRunJob)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get inputs: %w", err)
|
||||
}
|
||||
@@ -104,14 +83,3 @@ func EvaluateJobConcurrencyFillModel(ctx context.Context, run *actions_model.Act
|
||||
actionRunJob.IsConcurrencyEvaluated = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func getInputsFromRun(run *actions_model.ActionRun) (map[string]any, error) {
|
||||
if run.Event != "workflow_dispatch" {
|
||||
return map[string]any{}, nil
|
||||
}
|
||||
var payload api.WorkflowDispatchPayload
|
||||
if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return payload.Inputs, nil
|
||||
}
|
||||
|
||||
+130
-13
@@ -11,11 +11,14 @@ import (
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/models/db"
|
||||
actions_module "gitea.dev/modules/actions"
|
||||
"gitea.dev/modules/actions/jobparser"
|
||||
"gitea.dev/modules/container"
|
||||
"gitea.dev/modules/git"
|
||||
"gitea.dev/modules/json"
|
||||
"gitea.dev/modules/log"
|
||||
"gitea.dev/modules/optional"
|
||||
"gitea.dev/modules/setting"
|
||||
api "gitea.dev/modules/structs"
|
||||
"gitea.dev/modules/util"
|
||||
|
||||
"gitea.com/gitea/runner/act/model"
|
||||
@@ -96,6 +99,31 @@ func GenerateGiteaContext(ctx context.Context, run *actions_model.ActionRun, att
|
||||
if job != nil {
|
||||
gitContext["job"] = job.JobID
|
||||
gitContext["run_attempt"] = strconv.FormatInt(job.Attempt, 10)
|
||||
|
||||
if job.ParentJobID > 0 {
|
||||
// Inject the caller's resolved workflow_call inputs into gitea.event.inputs.
|
||||
// The rest of gitea.event stays as the caller's actual trigger event (push/pull_request/etc.)
|
||||
// to match GitHub's semantics (see https://docs.github.com/en/actions/reference/workflows-and-actions/reusing-workflow-configurations#github-context).
|
||||
// FIXME: If the run is triggered by "workflow_dispatch", the original inputs of "workflow_dispatch" will be overridden.
|
||||
// If necessary, the caller can send these values to the called workflow via `with:`.
|
||||
caller, err := actions_model.GetRunJobByRunAndID(ctx, job.RunID, job.ParentJobID)
|
||||
if err != nil {
|
||||
log.Error("GenerateGiteaContext: load caller job %d of job %d: %v", job.ParentJobID, job.ID, err)
|
||||
} else if caller.CallPayload != "" {
|
||||
var cp api.WorkflowCallPayload
|
||||
if err := json.Unmarshal([]byte(caller.CallPayload), &cp); err != nil {
|
||||
log.Error("GenerateGiteaContext: decode CallPayload of caller %d: %v", caller.ID, err)
|
||||
} else if cp.Inputs != nil {
|
||||
event["inputs"] = cp.Inputs
|
||||
}
|
||||
}
|
||||
|
||||
// Override gitea.event_name to "workflow_call", so that the runner-side `getEvaluatorInputs` can get inputs from event["inputs"].
|
||||
// https://gitea.com/gitea/runner/src/commit/0b9f251b6abb30d5f292a49cfe0c611f7c26d857/act/runner/expression.go#L509
|
||||
// FIXME: The trade-off is that `${{ gitea.event_name }}` inside a reusable workflow's child job reads "workflow_call"
|
||||
// instead of the caller's real trigger event name (push/pull_request/etc.) This is a small deviation from GitHub spec.
|
||||
gitContext["event_name"] = "workflow_call"
|
||||
}
|
||||
}
|
||||
|
||||
if attempt == nil {
|
||||
@@ -125,7 +153,8 @@ type TaskNeed struct {
|
||||
Outputs map[string]string
|
||||
}
|
||||
|
||||
// FindTaskNeeds finds the `needs` for the task by the task's job
|
||||
// FindTaskNeeds finds the `needs` for the task by the task's job.
|
||||
// Lookup is scoped to the same ParentJobID.
|
||||
func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*TaskNeed, error) {
|
||||
if len(job.Needs) == 0 {
|
||||
return nil, nil //nolint:nilnil // return nil when the job has no needs
|
||||
@@ -144,8 +173,16 @@ func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[st
|
||||
}
|
||||
|
||||
jobIDJobs := make(map[string][]*actions_model.ActionRunJob)
|
||||
for _, job := range jobs {
|
||||
jobIDJobs[job.JobID] = append(jobIDJobs[job.JobID], job)
|
||||
// childrenByParent indexes every job by its ParentJobID
|
||||
childrenByParent := make(map[int64][]*actions_model.ActionRunJob)
|
||||
for _, candidate := range jobs {
|
||||
if candidate.ParentJobID != 0 {
|
||||
childrenByParent[candidate.ParentJobID] = append(childrenByParent[candidate.ParentJobID], candidate)
|
||||
}
|
||||
// `needs` references are scope-bound: only candidates in the same caller scope match.
|
||||
if candidate.ParentJobID == job.ParentJobID {
|
||||
jobIDJobs[candidate.JobID] = append(jobIDJobs[candidate.JobID], candidate)
|
||||
}
|
||||
}
|
||||
|
||||
ret := make(map[string]*TaskNeed, len(needs))
|
||||
@@ -154,19 +191,19 @@ func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[st
|
||||
continue
|
||||
}
|
||||
var jobOutputs map[string]string
|
||||
for _, job := range jobsWithSameID {
|
||||
taskID := job.EffectiveTaskID()
|
||||
if taskID == 0 || !job.Status.IsDone() {
|
||||
// it shouldn't happen
|
||||
for _, candidate := range jobsWithSameID {
|
||||
if !candidate.Status.IsDone() {
|
||||
continue
|
||||
}
|
||||
got, err := actions_model.FindTaskOutputByTaskID(ctx, taskID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
|
||||
var outputs map[string]string
|
||||
var err error
|
||||
if candidate.IsReusableCaller {
|
||||
outputs, err = computeReusableCallerOutputs(ctx, candidate, childrenByParent)
|
||||
} else {
|
||||
outputs, err = loadJobTaskOutputs(ctx, candidate)
|
||||
}
|
||||
outputs := make(map[string]string, len(got))
|
||||
for _, v := range got {
|
||||
outputs[v.OutputKey] = v.OutputValue
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(jobOutputs) == 0 {
|
||||
jobOutputs = outputs
|
||||
@@ -182,6 +219,86 @@ func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[st
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// computeReusableCallerOutputs returns the workflow_call outputs of a reusable caller by recursing into its child subtree.
|
||||
func computeReusableCallerOutputs(ctx context.Context, caller *actions_model.ActionRunJob, childrenByParent map[int64][]*actions_model.ActionRunJob) (map[string]string, error) {
|
||||
if !caller.IsExpanded {
|
||||
// A caller that was never expanded (e.g. Skipped because its `if:` was false) has no workflow_call outputs, return early.
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
|
||||
directChildren := childrenByParent[caller.ID]
|
||||
|
||||
if err := caller.LoadRun(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wcSpec, err := jobparser.ParseWorkflowCallSpec(caller.ReusableWorkflowContent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(wcSpec.Outputs) == 0 {
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
|
||||
// Per-job outputs over the children of this caller.
|
||||
jobOutputs := make(jobparser.JobOutputs, len(directChildren))
|
||||
for _, child := range directChildren {
|
||||
var outs map[string]string
|
||||
switch {
|
||||
case child.IsReusableCaller:
|
||||
outs, err = computeReusableCallerOutputs(ctx, child, childrenByParent)
|
||||
default:
|
||||
outs, err = loadJobTaskOutputs(ctx, child)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if existing, ok := jobOutputs[child.JobID]; ok {
|
||||
jobOutputs[child.JobID] = mergeTwoOutputs(outs, existing)
|
||||
} else {
|
||||
jobOutputs[child.JobID] = outs
|
||||
}
|
||||
}
|
||||
|
||||
// build contexts for evaluating outputs
|
||||
if err := caller.Run.LoadAttributes(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gitCtx := GenerateGiteaContext(ctx, caller.Run, nil, caller)
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, caller.Run)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
inputs := map[string]any{}
|
||||
if caller.CallPayload != "" {
|
||||
var p api.WorkflowCallPayload
|
||||
if err := json.Unmarshal([]byte(caller.CallPayload), &p); err != nil {
|
||||
return nil, fmt.Errorf("decode caller payload: %w", err)
|
||||
}
|
||||
if p.Inputs != nil {
|
||||
inputs = p.Inputs
|
||||
}
|
||||
}
|
||||
|
||||
return jobparser.EvaluateWorkflowCallOutputs(wcSpec, gitCtx.ToGitHubContext(), vars, inputs, jobOutputs)
|
||||
}
|
||||
|
||||
// loadJobTaskOutputs returns the task-output map of `job`.
|
||||
func loadJobTaskOutputs(ctx context.Context, job *actions_model.ActionRunJob) (map[string]string, error) {
|
||||
tid := job.EffectiveTaskID()
|
||||
if tid == 0 {
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
rows, err := actions_model.FindTaskOutputByTaskID(ctx, tid)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
|
||||
}
|
||||
out := make(map[string]string, len(rows))
|
||||
for _, r := range rows {
|
||||
out[r.OutputKey] = r.OutputValue
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// mergeTwoOutputs merges two outputs from two different ActionRunJobs
|
||||
// Values with the same output name may be overridden. The user should ensure the output names are unique.
|
||||
// See https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for-github-actions#using-job-outputs-in-a-matrix-job
|
||||
|
||||
@@ -8,7 +8,10 @@ import (
|
||||
"testing"
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/models/db"
|
||||
"gitea.dev/models/unittest"
|
||||
"gitea.dev/modules/json"
|
||||
api "gitea.dev/modules/structs"
|
||||
|
||||
act_model "gitea.com/gitea/runner/act/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -16,10 +19,8 @@ import (
|
||||
)
|
||||
|
||||
func TestEvaluateRunConcurrency_RunIDFallback(t *testing.T) {
|
||||
// Unit-level check that EvaluateRunConcurrencyFillModel resolves
|
||||
// github.run_id from run.ID. The full-flow regression — that run.ID is
|
||||
// non-zero by the time evaluation happens — is in
|
||||
// TestPrepareRunAndInsert_ExpressionsSeeRunID.
|
||||
// Unit-level check that EvaluateRunConcurrencyFillModel resolves github.run_id from run.ID.
|
||||
// The full-flow regression (run.ID non-zero by evaluation time) is TestPrepareRunAndInsert_ExpressionsSeeRunID.
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := t.Context()
|
||||
|
||||
@@ -43,10 +44,8 @@ func TestEvaluateRunConcurrency_RunIDFallback(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPrepareRunAndInsert_ExpressionsSeeRunID(t *testing.T) {
|
||||
// Regression for the cross-branch concurrency leak: github.run_id must
|
||||
// be available during BOTH jobparser.Parse (run-name) and workflow-level
|
||||
// concurrency evaluation. Re-ordering db.Insert relative to either step
|
||||
// would leave run.ID at 0 and break this test.
|
||||
// Regression for the cross-branch concurrency leak: github.run_id must be available during both
|
||||
// jobparser.Parse (run-name) and concurrency evaluation; inserting run after either leaves run.ID at 0.
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := t.Context()
|
||||
|
||||
@@ -90,6 +89,219 @@ jobs:
|
||||
assert.NotEmpty(t, persisted.RawConcurrency)
|
||||
}
|
||||
|
||||
func TestComputeReusableCallerOutputs(t *testing.T) {
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := t.Context()
|
||||
|
||||
var nextRunIndex int64 = 9001
|
||||
insertRun := func(t *testing.T, workflowID string) *actions_model.ActionRun {
|
||||
t.Helper()
|
||||
run := &actions_model.ActionRun{
|
||||
Title: "reusable-out",
|
||||
RepoID: 4,
|
||||
Index: nextRunIndex,
|
||||
OwnerID: 1,
|
||||
WorkflowID: workflowID,
|
||||
TriggerUserID: 1,
|
||||
Ref: "refs/heads/master",
|
||||
CommitSHA: "c2d72f548424103f01ee1dc02889c1e2bff816b0",
|
||||
Event: "push",
|
||||
TriggerEvent: "push",
|
||||
EventPayload: "{}",
|
||||
Status: actions_model.StatusSuccess,
|
||||
}
|
||||
nextRunIndex++
|
||||
require.NoError(t, db.Insert(ctx, run))
|
||||
return run
|
||||
}
|
||||
|
||||
insertCaller := func(t *testing.T, run *actions_model.ActionRun, jobID string, parentID int64, content, callPayload string) *actions_model.ActionRunJob {
|
||||
t.Helper()
|
||||
job := &actions_model.ActionRunJob{
|
||||
RunID: run.ID,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: run.CommitSHA,
|
||||
Name: jobID,
|
||||
JobID: jobID,
|
||||
Attempt: 1,
|
||||
Status: actions_model.StatusSuccess,
|
||||
ParentJobID: parentID,
|
||||
IsReusableCaller: true,
|
||||
IsExpanded: true,
|
||||
ReusableWorkflowContent: []byte(content),
|
||||
CallPayload: callPayload,
|
||||
}
|
||||
require.NoError(t, db.Insert(ctx, job))
|
||||
return job
|
||||
}
|
||||
|
||||
// Each call to insertChildJobAndTask with non-empty outputs allocates a fresh TaskID
|
||||
// so its action_task_output rows stay isolated per subtest.
|
||||
var nextTaskID int64 = 90001
|
||||
insertChildJobAndTask := func(t *testing.T, run *actions_model.ActionRun, jobID string, parentID int64, outputs map[string]string) *actions_model.ActionRunJob {
|
||||
t.Helper()
|
||||
var taskID int64
|
||||
if len(outputs) > 0 {
|
||||
taskID = nextTaskID
|
||||
nextTaskID++
|
||||
}
|
||||
job := &actions_model.ActionRunJob{
|
||||
RunID: run.ID,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: run.CommitSHA,
|
||||
Name: jobID,
|
||||
JobID: jobID,
|
||||
Attempt: 1,
|
||||
Status: actions_model.StatusSuccess,
|
||||
ParentJobID: parentID,
|
||||
TaskID: taskID,
|
||||
}
|
||||
require.NoError(t, db.Insert(ctx, job))
|
||||
for k, v := range outputs {
|
||||
require.NoError(t, db.Insert(ctx, &actions_model.ActionTaskOutput{
|
||||
TaskID: taskID,
|
||||
OutputKey: k,
|
||||
OutputValue: v,
|
||||
}))
|
||||
}
|
||||
return job
|
||||
}
|
||||
|
||||
// childrenByParentOfRun returns the run's jobs indexed by ParentJobID, the shape computeReusableCallerOutputs expects.
|
||||
childrenByParentOfRun := func(t *testing.T, runID int64) map[int64][]*actions_model.ActionRunJob {
|
||||
t.Helper()
|
||||
all, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID})
|
||||
require.NoError(t, err)
|
||||
index := make(map[int64][]*actions_model.ActionRunJob)
|
||||
for _, j := range all {
|
||||
if j.ParentJobID != 0 {
|
||||
index[j.ParentJobID] = append(index[j.ParentJobID], j)
|
||||
}
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
t.Run("returns empty when callee declares no outputs", func(t *testing.T) {
|
||||
run := insertRun(t, "no-outputs.yaml")
|
||||
caller := insertCaller(t, run, "caller", 0, `on:
|
||||
workflow_call:
|
||||
outputs: {}
|
||||
`, "")
|
||||
out, err := computeReusableCallerOutputs(ctx, caller, childrenByParentOfRun(t, run.ID))
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, out)
|
||||
})
|
||||
|
||||
t.Run("unexpanded (skipped) caller yields empty outputs without error", func(t *testing.T) {
|
||||
run := insertRun(t, "skipped-caller.yaml")
|
||||
// A reusable caller skipped before expansion: IsExpanded=false, empty ReusableWorkflowContent, no children.
|
||||
caller := &actions_model.ActionRunJob{
|
||||
RunID: run.ID,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: run.CommitSHA,
|
||||
Name: "caller",
|
||||
JobID: "caller",
|
||||
Attempt: 1,
|
||||
Status: actions_model.StatusSkipped,
|
||||
IsReusableCaller: true,
|
||||
IsExpanded: false,
|
||||
}
|
||||
require.NoError(t, db.Insert(ctx, caller))
|
||||
out, err := computeReusableCallerOutputs(ctx, caller, childrenByParentOfRun(t, run.ID))
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, out)
|
||||
})
|
||||
|
||||
t.Run("literal output value passes through", func(t *testing.T) {
|
||||
run := insertRun(t, "literal-out.yaml")
|
||||
caller := insertCaller(t, run, "caller", 0, `on:
|
||||
workflow_call:
|
||||
outputs:
|
||||
hello:
|
||||
value: world
|
||||
`, "")
|
||||
out, err := computeReusableCallerOutputs(ctx, caller, childrenByParentOfRun(t, run.ID))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"hello": "world"}, out)
|
||||
})
|
||||
|
||||
t.Run("output expression reads child task outputs", func(t *testing.T) {
|
||||
run := insertRun(t, "child-out.yaml")
|
||||
caller := insertCaller(t, run, "caller", 0, `on:
|
||||
workflow_call:
|
||||
outputs:
|
||||
result:
|
||||
value: ${{ jobs.child.outputs.foo }}
|
||||
`, "")
|
||||
insertChildJobAndTask(t, run, "child", caller.ID, map[string]string{"foo": "bar"})
|
||||
|
||||
out, err := computeReusableCallerOutputs(ctx, caller, childrenByParentOfRun(t, run.ID))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"result": "bar"}, out)
|
||||
})
|
||||
|
||||
t.Run("CallPayload inputs reachable in output expression", func(t *testing.T) {
|
||||
run := insertRun(t, "payload-out.yaml")
|
||||
payload, err := json.Marshal(api.WorkflowCallPayload{
|
||||
Inputs: map[string]any{"env": "staging"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
caller := insertCaller(t, run, "caller", 0, `on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
env:
|
||||
type: string
|
||||
outputs:
|
||||
env:
|
||||
value: ${{ inputs.env }}
|
||||
`, string(payload))
|
||||
|
||||
out, err := computeReusableCallerOutputs(ctx, caller, childrenByParentOfRun(t, run.ID))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"env": "staging"}, out)
|
||||
})
|
||||
|
||||
t.Run("nested caller outputs propagate to outer", func(t *testing.T) {
|
||||
run := insertRun(t, "nested-out.yaml")
|
||||
outer := insertCaller(t, run, "outer", 0, `on:
|
||||
workflow_call:
|
||||
outputs:
|
||||
bubbled:
|
||||
value: ${{ jobs.inner.outputs.up }}
|
||||
`, "")
|
||||
inner := insertCaller(t, run, "inner", outer.ID, `on:
|
||||
workflow_call:
|
||||
outputs:
|
||||
up:
|
||||
value: ${{ jobs.leaf.outputs.foo }}
|
||||
`, "")
|
||||
insertChildJobAndTask(t, run, "leaf", inner.ID, map[string]string{"foo": "bubble-value"})
|
||||
|
||||
out, err := computeReusableCallerOutputs(ctx, outer, childrenByParentOfRun(t, run.ID))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"bubbled": "bubble-value"}, out)
|
||||
})
|
||||
|
||||
t.Run("matrix children with same JobID prefer non-empty values", func(t *testing.T) {
|
||||
run := insertRun(t, "matrix-out.yaml")
|
||||
caller := insertCaller(t, run, "caller", 0, `on:
|
||||
workflow_call:
|
||||
outputs:
|
||||
foo:
|
||||
value: ${{ jobs.matrix.outputs.foo }}
|
||||
`, "")
|
||||
insertChildJobAndTask(t, run, "matrix", caller.ID, map[string]string{"foo": ""})
|
||||
insertChildJobAndTask(t, run, "matrix", caller.ID, map[string]string{"foo": "filled"})
|
||||
|
||||
out, err := computeReusableCallerOutputs(ctx, caller, childrenByParentOfRun(t, run.ID))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"foo": "filled"}, out)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFindTaskNeeds(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/modules/actions/jobparser"
|
||||
"gitea.dev/modules/json"
|
||||
api "gitea.dev/modules/structs"
|
||||
)
|
||||
|
||||
func getWorkflowDispatchInputsFromRun(run *actions_model.ActionRun) (map[string]any, error) {
|
||||
if run.Event != "workflow_dispatch" {
|
||||
return map[string]any{}, nil
|
||||
}
|
||||
var payload api.WorkflowDispatchPayload
|
||||
if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return payload.Inputs, nil
|
||||
}
|
||||
|
||||
// getInputsForJob returns the `inputs.*` top-level expression context for a job's evaluation.
|
||||
// - For top-level jobs, it falls back to the run's dispatch inputs (empty for non-dispatch events)
|
||||
// - For reusable workflow children (and nested callers), this is the direct parent caller's CallPayload.Inputs
|
||||
func getInputsForJob(ctx context.Context, run *actions_model.ActionRun, job *actions_model.ActionRunJob) (map[string]any, error) {
|
||||
if job.ParentJobID == 0 {
|
||||
return getWorkflowDispatchInputsFromRun(run)
|
||||
}
|
||||
|
||||
caller, err := actions_model.GetRunJobByRunAndID(ctx, run.ID, job.ParentJobID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load caller job %d: %w", job.ParentJobID, err)
|
||||
}
|
||||
if caller.CallPayload == "" {
|
||||
// should not happen - a child job cannot reach this point if its caller's CallPayload hasn't been evaluated
|
||||
return map[string]any{}, nil
|
||||
}
|
||||
var p api.WorkflowCallPayload
|
||||
if err := json.Unmarshal([]byte(caller.CallPayload), &p); err != nil {
|
||||
return nil, fmt.Errorf("decode caller %d payload: %w", caller.ID, err)
|
||||
}
|
||||
if p.Inputs == nil {
|
||||
return map[string]any{}, nil
|
||||
}
|
||||
return p.Inputs, nil
|
||||
}
|
||||
|
||||
// evaluateJobIf evaluates a job's `if:`
|
||||
func evaluateJobIf(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.ActionRunAttempt, job *actions_model.ActionRunJob, vars map[string]string, allNeedsSucceed bool) (bool, error) {
|
||||
parsedJob, err := job.ParseJob()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Empty `if:` reduces to implicit `success()` - true iff every need finished as Success.
|
||||
if len(parsedJob.If.Value) == 0 {
|
||||
return allNeedsSucceed, nil
|
||||
}
|
||||
jobResults, err := findJobNeedsAndFillJobResults(ctx, job)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
inputs, err := getInputsForJob(ctx, run, job)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
gitCtx := GenerateGiteaContext(ctx, run, attempt, job)
|
||||
return jobparser.EvaluateJobIfExpression(job.JobID, parsedJob, gitCtx, jobResults, vars, inputs)
|
||||
}
|
||||
|
||||
func findJobNeedsAndFillJobResults(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*jobparser.JobResult, error) {
|
||||
taskNeeds, err := FindTaskNeeds(ctx, job)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("find task needs: %w", err)
|
||||
}
|
||||
jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
|
||||
for jobID, taskNeed := range taskNeeds {
|
||||
jobResult := &jobparser.JobResult{
|
||||
Result: taskNeed.Result.String(),
|
||||
Outputs: taskNeed.Outputs,
|
||||
}
|
||||
jobResults[jobID] = jobResult
|
||||
}
|
||||
jobResults[job.JobID] = &jobparser.JobResult{
|
||||
Needs: job.Needs,
|
||||
}
|
||||
return jobResults, nil
|
||||
}
|
||||
+136
-73
@@ -69,39 +69,48 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("get action run: %w", err)
|
||||
}
|
||||
var jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob
|
||||
var result jobsCheckResult
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
// check jobs of the current run
|
||||
if js, ujs, cjs, err := checkJobsOfCurrentRunAttempt(ctx, run); err != nil {
|
||||
r, err := checkJobsOfCurrentRunAttempt(ctx, run)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
jobs = append(jobs, js...)
|
||||
updatedJobs = append(updatedJobs, ujs...)
|
||||
cancelledJobs = append(cancelledJobs, cjs...)
|
||||
}
|
||||
if js, ujs, cjs, err := checkRunConcurrency(ctx, run); err != nil {
|
||||
result.merge(r)
|
||||
|
||||
r, err = checkRunConcurrency(ctx, run)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
jobs = append(jobs, js...)
|
||||
updatedJobs = append(updatedJobs, ujs...)
|
||||
cancelledJobs = append(cancelledJobs, cjs...)
|
||||
}
|
||||
result.merge(r)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, cancelledJobs)
|
||||
EmitJobsIfReadyByJobs(cancelledJobs)
|
||||
if err := createCommitStatusesForJobsByRun(ctx, jobs); err != nil {
|
||||
// Re-emit AFTER the transaction commits; doing this inside WithTx would deadlock under
|
||||
// immediate-mode queues (the inline handler reopens checkJobsByRunID and asks for a
|
||||
// nested writer transaction while the outer one is still open).
|
||||
emitted := make(container.Set[int64])
|
||||
for _, rid := range result.RunIDsToReEmit {
|
||||
if !emitted.Add(rid) {
|
||||
continue
|
||||
}
|
||||
if err := EmitJobsIfReadyByRun(rid); err != nil {
|
||||
log.Error("re-emit run %d after caller expansion: %v", rid, err)
|
||||
}
|
||||
}
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, result.CancelledJobs)
|
||||
EmitJobsIfReadyByJobs(result.CancelledJobs)
|
||||
if err := createCommitStatusesForJobsByRun(ctx, result.Jobs); err != nil {
|
||||
return err
|
||||
}
|
||||
NotifyWorkflowJobsStatusUpdate(ctx, updatedJobs...)
|
||||
NotifyWorkflowJobsStatusUpdate(ctx, result.UpdatedJobs...)
|
||||
runJobs := make(map[int64][]*actions_model.ActionRunJob)
|
||||
for _, job := range jobs {
|
||||
for _, job := range result.Jobs {
|
||||
runJobs[job.RunID] = append(runJobs[job.RunID], job)
|
||||
}
|
||||
runUpdatedJobs := make(map[int64][]*actions_model.ActionRunJob)
|
||||
for _, uj := range updatedJobs {
|
||||
for _, uj := range result.UpdatedJobs {
|
||||
runUpdatedJobs[uj.RunID] = append(runUpdatedJobs[uj.RunID], uj)
|
||||
}
|
||||
for runID, js := range runJobs {
|
||||
@@ -158,20 +167,22 @@ func findBlockedRunIDByConcurrency(ctx context.Context, repoID int64, concurrenc
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func checkBlockedConcurrentRun(ctx context.Context, repoID, runID int64) (jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob, err error) {
|
||||
func checkBlockedConcurrentRun(ctx context.Context, repoID, runID int64) (*jobsCheckResult, error) {
|
||||
concurrentRun, err := actions_model.GetRunByRepoAndID(ctx, repoID, runID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("get run %d: %w", runID, err)
|
||||
return nil, fmt.Errorf("get run %d: %w", runID, err)
|
||||
}
|
||||
if concurrentRun.NeedApproval {
|
||||
return nil, nil, nil, nil
|
||||
return &jobsCheckResult{}, nil
|
||||
}
|
||||
|
||||
return checkJobsOfCurrentRunAttempt(ctx, concurrentRun)
|
||||
}
|
||||
|
||||
// checkRunConcurrency rechecks runs blocked by concurrency that may become unblocked after the current run releases a workflow-level or job-level concurrency group.
|
||||
func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob, err error) {
|
||||
// RunIDsToReEmit propagates from inner checkJobsOfCurrentRunAttempt calls; see that function's doc.
|
||||
func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (*jobsCheckResult, error) {
|
||||
result := &jobsCheckResult{}
|
||||
checkedConcurrencyGroup := make(container.Set[string])
|
||||
|
||||
collect := func(concurrencyGroup string) error {
|
||||
@@ -180,13 +191,11 @@ func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (job
|
||||
return fmt.Errorf("find blocked run by concurrency: %w", err)
|
||||
}
|
||||
if concurrentRunID > 0 {
|
||||
js, ujs, cjs, err := checkBlockedConcurrentRun(ctx, run.RepoID, concurrentRunID)
|
||||
r, err := checkBlockedConcurrentRun(ctx, run.RepoID, concurrentRunID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
jobs = append(jobs, js...)
|
||||
updatedJobs = append(updatedJobs, ujs...)
|
||||
cancelledJobs = append(cancelledJobs, cjs...)
|
||||
result.merge(r)
|
||||
}
|
||||
checkedConcurrencyGroup.Add(concurrencyGroup)
|
||||
return nil
|
||||
@@ -195,18 +204,18 @@ func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (job
|
||||
// check run (workflow-level) concurrency
|
||||
runConcurrencyGroup, _, err := run.GetEffectiveConcurrency(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("GetEffectiveConcurrency: %w", err)
|
||||
return nil, fmt.Errorf("GetEffectiveConcurrency: %w", err)
|
||||
}
|
||||
if runConcurrencyGroup != "" {
|
||||
if err := collect(runConcurrencyGroup); err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// check job concurrency
|
||||
runJobs, err := actions_model.GetLatestAttemptJobsByRepoAndRunID(ctx, run.RepoID, run.ID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
|
||||
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
|
||||
}
|
||||
for _, job := range runJobs {
|
||||
if !job.Status.IsDone() {
|
||||
@@ -216,42 +225,47 @@ func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (job
|
||||
continue
|
||||
}
|
||||
if err := collect(job.ConcurrencyGroup); err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return jobs, updatedJobs, cancelledJobs, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// checkJobsOfCurrentRunAttempt resolves blocked jobs of the run's latest attempt.
|
||||
func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs, cancelledJobs []*actions_model.ActionRunJob, err error) {
|
||||
jobs, err = actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, run.LatestAttemptID)
|
||||
func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.ActionRun) (*jobsCheckResult, error) {
|
||||
jobs, err := actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, run.LatestAttemptID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
result := &jobsCheckResult{Jobs: jobs}
|
||||
|
||||
var attempt *actions_model.ActionRunAttempt
|
||||
if run.LatestAttemptID > 0 {
|
||||
attempt, err = actions_model.GetRunAttemptByRepoAndID(ctx, run.RepoID, run.LatestAttemptID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// The resolver below only considers needs and job-level concurrency, so a run blocked
|
||||
// solely by run-level concurrency would have its jobs unblocked here. checkRunConcurrency
|
||||
// re-evaluates when the holding run finishes.
|
||||
if run.Status.IsBlocked() {
|
||||
attempt, has, err := run.GetLatestAttempt(ctx)
|
||||
if run.Status.IsBlocked() && attempt != nil {
|
||||
shouldBlock, err := shouldBlockRunByConcurrency(ctx, attempt)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("GetLatestAttempt: %w", err)
|
||||
return nil, fmt.Errorf("shouldBlockRunByConcurrency: %w", err)
|
||||
}
|
||||
if has {
|
||||
shouldBlock, err := shouldBlockRunByConcurrency(ctx, attempt)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("shouldBlockRunByConcurrency: %w", err)
|
||||
}
|
||||
if shouldBlock {
|
||||
return jobs, nil, nil, nil
|
||||
}
|
||||
if shouldBlock {
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
resolver := newJobStatusResolver(jobs, vars)
|
||||
|
||||
expandedAnyCaller := false
|
||||
if err = db.WithTx(ctx, func(ctx context.Context) error {
|
||||
for _, job := range jobs {
|
||||
job.Run = run
|
||||
@@ -259,22 +273,47 @@ func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.Action
|
||||
|
||||
updates := resolver.Resolve(ctx)
|
||||
for _, job := range jobs {
|
||||
if status, ok := updates[job.ID]; ok {
|
||||
job.Status = status
|
||||
if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
|
||||
return err
|
||||
} else if n != 1 {
|
||||
return fmt.Errorf("no affected for updating blocked job %v", job.ID)
|
||||
}
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
status, ok := updates[job.ID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if job.IsReusableCaller {
|
||||
switch status {
|
||||
case actions_model.StatusWaiting:
|
||||
if err := expandReusableWorkflowCaller(ctx, run, attempt, job, vars); err != nil {
|
||||
return fmt.Errorf("trigger caller-ready %d: %w", job.ID, err)
|
||||
}
|
||||
// expandReusableWorkflowCaller inserts children as Blocked. They need a follow-up resolver pass.
|
||||
expandedAnyCaller = true
|
||||
case actions_model.StatusSkipped:
|
||||
job.Status = actions_model.StatusSkipped
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "status"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Non-caller: standard status update.
|
||||
job.Status = status
|
||||
if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
|
||||
return err
|
||||
} else if n != 1 {
|
||||
return fmt.Errorf("no affected for updating blocked job %v", job.ID)
|
||||
}
|
||||
result.UpdatedJobs = append(result.UpdatedJobs, job)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jobs, updatedJobs, resolver.cancelledJobs, nil
|
||||
if expandedAnyCaller {
|
||||
result.RunIDsToReEmit = append(result.RunIDsToReEmit, run.ID)
|
||||
}
|
||||
result.CancelledJobs = resolver.cancelledJobs
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type jobStatusResolver struct {
|
||||
@@ -286,10 +325,17 @@ type jobStatusResolver struct {
|
||||
}
|
||||
|
||||
func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver {
|
||||
idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
|
||||
// Scope-aware: needs are resolved within the same ParentJobID scope so the same
|
||||
// JobID in different reusable workflow calls does not cross-link.
|
||||
scopedIDToJobs := make(map[int64]map[string][]*actions_model.ActionRunJob)
|
||||
jobMap := make(map[int64]*actions_model.ActionRunJob)
|
||||
for _, job := range jobs {
|
||||
idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
|
||||
scope := scopedIDToJobs[job.ParentJobID]
|
||||
if scope == nil {
|
||||
scope = make(map[string][]*actions_model.ActionRunJob)
|
||||
scopedIDToJobs[job.ParentJobID] = scope
|
||||
}
|
||||
scope[job.JobID] = append(scope[job.JobID], job)
|
||||
jobMap[job.ID] = job
|
||||
}
|
||||
|
||||
@@ -297,8 +343,9 @@ func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]stri
|
||||
needs := make(map[int64][]int64, len(jobs))
|
||||
for _, job := range jobs {
|
||||
statuses[job.ID] = job.Status
|
||||
scope := scopedIDToJobs[job.ParentJobID]
|
||||
for _, need := range job.Needs {
|
||||
for _, v := range idToJobs[need] {
|
||||
for _, v := range scope[need] {
|
||||
needs[job.ID] = append(needs[job.ID], v.ID)
|
||||
}
|
||||
}
|
||||
@@ -340,14 +387,6 @@ func (r *jobStatusResolver) resolveCheckNeeds(id int64) (allDone, allSucceed boo
|
||||
return allDone, allSucceed
|
||||
}
|
||||
|
||||
func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model.ActionRunJob) (hasIf bool) {
|
||||
// FIXME evaluate this on the server side
|
||||
if job, err := actionRunJob.ParseJob(); err == nil {
|
||||
return len(job.If.Value) > 0
|
||||
}
|
||||
return hasIf
|
||||
}
|
||||
|
||||
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
|
||||
ret := map[int64]actions_model.Status{}
|
||||
for id, status := range r.statuses {
|
||||
@@ -355,6 +394,12 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
|
||||
if status != actions_model.StatusBlocked {
|
||||
continue
|
||||
}
|
||||
// A child of a caller cannot start until the caller has become "ready" (children inserted, CallPayload populated).
|
||||
if actionRunJob.ParentJobID > 0 {
|
||||
if parent, ok := r.jobMap[actionRunJob.ParentJobID]; ok && !parent.IsExpanded {
|
||||
continue
|
||||
}
|
||||
}
|
||||
allDone, allSucceed := r.resolveCheckNeeds(id)
|
||||
if !allDone {
|
||||
continue
|
||||
@@ -365,18 +410,16 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
|
||||
if err != nil {
|
||||
// The err can be caused by different cases: database error, or syntax error, or the needed jobs haven't completed
|
||||
// At the moment there is no way to distinguish them.
|
||||
// Actually, for most cases, the error is caused by "syntax error" / "the needed jobs haven't completed (skipped?)"
|
||||
// TODO: if workflow or concurrency expression has syntax error, there should be a user error message, need to show it to end users
|
||||
log.Debug("updateConcurrencyEvaluationForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
shouldStartJob := true
|
||||
if !allSucceed {
|
||||
// Not all dependent jobs completed successfully:
|
||||
// * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition.
|
||||
// * otherwise, the job should be skipped.
|
||||
shouldStartJob = r.resolveJobHasIfCondition(actionRunJob)
|
||||
shouldStartJob, err := evaluateJobIf(ctx, actionRunJob.Run, nil, actionRunJob, r.vars, allSucceed)
|
||||
if err != nil {
|
||||
// TODO: surface deterministic expression errors to users by failing the job with a message.
|
||||
log.Error("evaluateJobIf failed, job will stay blocked: job: %d, err: %v", id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped)
|
||||
@@ -420,3 +463,23 @@ func updateConcurrencyEvaluationForJobWithNeeds(ctx context.Context, actionRunJo
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// jobsCheckResult bundles the output of the per-run job-check helpers.
|
||||
type jobsCheckResult struct {
|
||||
// Jobs are all jobs of the run's latest attempt that were inspected.
|
||||
Jobs []*actions_model.ActionRunJob
|
||||
// UpdatedJobs are jobs whose status was transitioned out of Blocked in this pass.
|
||||
UpdatedJobs []*actions_model.ActionRunJob
|
||||
// CancelledJobs are jobs cancelled by job-level concurrency while preparing to start.
|
||||
CancelledJobs []*actions_model.ActionRunJob
|
||||
// RunIDsToReEmit are runs whose newly expanded reusable workflow callers need another resolver pass.
|
||||
RunIDsToReEmit []int64
|
||||
}
|
||||
|
||||
// merge appends another result's contents into r in place.
|
||||
func (r *jobsCheckResult) merge(other *jobsCheckResult) {
|
||||
r.Jobs = append(r.Jobs, other.Jobs...)
|
||||
r.UpdatedJobs = append(r.UpdatedJobs, other.UpdatedJobs...)
|
||||
r.CancelledJobs = append(r.CancelledJobs, other.CancelledJobs...)
|
||||
r.RunIDsToReEmit = append(r.RunIDsToReEmit, other.RunIDsToReEmit...)
|
||||
}
|
||||
|
||||
@@ -4,11 +4,14 @@
|
||||
package actions
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/models/db"
|
||||
repo_model "gitea.dev/models/repo"
|
||||
"gitea.dev/models/unittest"
|
||||
user_model "gitea.dev/models/user"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@@ -129,10 +132,48 @@ jobs:
|
||||
want: map[int64]actions_model.Status{2: actions_model.StatusSkipped},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := t.Context()
|
||||
stubRun := &actions_model.ActionRun{TriggerUser: &user_model.User{}, Repo: &repo_model.Repository{}}
|
||||
for i, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Each subtest gets a unique RunID / RunAttemptID so jobs from different subtests don't bleed into each other's FindTaskNeeds queries
|
||||
runID := int64(9001 + i)
|
||||
attemptID := int64(9001 + i)
|
||||
|
||||
// Insert each test job (letting the DB assign IDs) and remember the testID -> dbID mapping so we can translate the expected map.
|
||||
idMap := make(map[int64]int64, len(tt.jobs))
|
||||
for _, j := range tt.jobs {
|
||||
origID := j.ID
|
||||
j.ID = 0
|
||||
j.RunID = runID
|
||||
j.RunAttemptID = attemptID
|
||||
j.Run = stubRun
|
||||
|
||||
// The resolver evaluates Blocked jobs via evaluateJobIf, which needs a valid YAML payload;
|
||||
// supply a minimal one when the case didn't.
|
||||
if j.Status == actions_model.StatusBlocked && len(j.WorkflowPayload) == 0 {
|
||||
j.WorkflowPayload = fmt.Appendf(nil, `name: test
|
||||
on: push
|
||||
jobs:
|
||||
%s:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- run: echo
|
||||
`, j.JobID)
|
||||
}
|
||||
|
||||
assert.NoError(t, db.Insert(ctx, j))
|
||||
idMap[origID] = j.ID
|
||||
}
|
||||
|
||||
want := make(map[int64]actions_model.Status, len(tt.want))
|
||||
for k, v := range tt.want {
|
||||
want[idMap[k]] = v
|
||||
}
|
||||
|
||||
r := newJobStatusResolver(tt.jobs, nil)
|
||||
assert.Equal(t, tt.want, r.Resolve(t.Context()))
|
||||
assert.Equal(t, want, r.Resolve(ctx))
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -221,11 +262,11 @@ func Test_checkRunConcurrency_NoDuplicateConcurrencyGroupCheck(t *testing.T) {
|
||||
assert.NoError(t, db.Insert(ctx, jobBBlocked))
|
||||
|
||||
runA, _, _ = db.GetByID[actions_model.ActionRun](t.Context(), runA.ID)
|
||||
jobs, _, _, err := checkRunConcurrency(ctx, runA)
|
||||
result, err := checkRunConcurrency(ctx, runA)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if assert.Len(t, jobs, 1) {
|
||||
assert.Equal(t, jobBBlocked.ID, jobs[0].ID)
|
||||
if assert.Len(t, result.Jobs, 1) {
|
||||
assert.Equal(t, jobBBlocked.ID, result.Jobs[0].ID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,9 +327,9 @@ jobs:
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, blockedJob))
|
||||
|
||||
_, updated, _, err := checkJobsOfCurrentRunAttempt(ctx, blockedRun)
|
||||
result, err := checkJobsOfCurrentRunAttempt(ctx, blockedRun)
|
||||
assert.NoError(t, err)
|
||||
assert.Empty(t, updated)
|
||||
assert.Empty(t, result.UpdatedJobs)
|
||||
|
||||
refreshed := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: blockedJob.ID})
|
||||
assert.Equal(t, actions_model.StatusBlocked, refreshed.Status)
|
||||
|
||||
+198
-30
@@ -14,6 +14,7 @@ import (
|
||||
"gitea.dev/models/unit"
|
||||
user_model "gitea.dev/models/user"
|
||||
"gitea.dev/modules/container"
|
||||
"gitea.dev/modules/log"
|
||||
"gitea.dev/modules/setting"
|
||||
"gitea.dev/modules/util"
|
||||
|
||||
@@ -42,6 +43,7 @@ func GetFailedJobsForRerun(allJobs []*actions_model.ActionRunJob) []*actions_mod
|
||||
// rather than one big outer transaction:
|
||||
// - execRerunPlan performs slow work (loading variables, YAML unmarshal, concurrency expression evaluation)
|
||||
// before opening its own transaction, so the tx stays focused on inserts/updates.
|
||||
// (Exception: reusable workflow caller expansion runs inside the tx, see expandReusableWorkflowCaller's doc.)
|
||||
// - The legacy backfill is idempotent-friendly: if it succeeds but a later stage fails, a subsequent rerun
|
||||
// will observe run.LatestAttemptID != 0 and skip the backfill, continuing naturally. No data corruption
|
||||
// or stuck state results from partial progress.
|
||||
@@ -112,8 +114,19 @@ type rerunPlan struct {
|
||||
run *actions_model.ActionRun
|
||||
templateAttempt *actions_model.ActionRunAttempt
|
||||
templateJobs actions_model.ActionJobList
|
||||
rerunJobIDs container.Set[string]
|
||||
triggerUser *user_model.User
|
||||
|
||||
// rerunAttemptJobIDs holds the AttemptJobIDs of jobs that will actually be re-run in the new attempt.
|
||||
// If a job here is a reusable caller, the whole subtree under it will be re-run.
|
||||
rerunAttemptJobIDs container.Set[int64]
|
||||
|
||||
// ancestorAttemptJobIDs holds the AttemptJobIDs of reusable caller jobs that have only some of their descendants being re-run:
|
||||
// the caller itself is NOT re-run as a whole, it stays pass-through and its non-rerun children stay pass-through too.
|
||||
ancestorAttemptJobIDs container.Set[int64]
|
||||
|
||||
// skipCloneTemplateJobIDs holds the template-attempt DB row IDs of descendants of any reusable caller in rerunAttemptJobIDs.
|
||||
// These jobs should not be cloned, since the caller's lazy expansion will re-insert them fresh.
|
||||
skipCloneTemplateJobIDs container.Set[int64]
|
||||
}
|
||||
|
||||
// buildRerunPlan constructs a rerunPlan for the given workflow run without writing to the database.
|
||||
@@ -151,6 +164,7 @@ func buildRerunPlan(ctx context.Context, run *actions_model.ActionRun, triggerUs
|
||||
if err := plan.expandRerunJobIDs(jobsToRerun); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
plan.skipCloneTemplateJobIDs = plan.collectResetCallerDescendants()
|
||||
|
||||
return plan, nil
|
||||
}
|
||||
@@ -188,6 +202,7 @@ func execRerunPlan(ctx context.Context, plan *rerunPlan) (*actions_model.ActionR
|
||||
|
||||
var newJobs, newJobsToRerun actions_model.ActionJobList
|
||||
var cancelledConcurrencyJobs []*actions_model.ActionRunJob
|
||||
var hasWaitingCallerJobs bool
|
||||
|
||||
err = db.WithTx(ctx, func(ctx context.Context) error {
|
||||
newAttemptStatus, jobsToCancel, err := PrepareToStartRunWithConcurrency(ctx, newAttempt)
|
||||
@@ -212,10 +227,30 @@ func execRerunPlan(ctx context.Context, plan *rerunPlan) (*actions_model.ActionR
|
||||
|
||||
hasWaitingJobs := false
|
||||
newJobs = make(actions_model.ActionJobList, 0, len(plan.templateJobs))
|
||||
newJobsToRerun = make(actions_model.ActionJobList, 0, len(plan.rerunJobIDs))
|
||||
newJobsToRerun = make(actions_model.ActionJobList, 0, len(plan.rerunAttemptJobIDs))
|
||||
|
||||
// templateIDToNewID maps each template-attempt job's DB ID to its newly-inserted clone's DB ID
|
||||
templateIDToNewID := make(map[int64]int64, len(plan.templateJobs))
|
||||
|
||||
for _, templateJob := range plan.templateJobs {
|
||||
// descendants of a reset reusable caller are not cloned at all, the caller will re-insert them
|
||||
if plan.skipCloneTemplateJobIDs.Contains(templateJob.ID) {
|
||||
continue
|
||||
}
|
||||
|
||||
newJob := cloneRunJobForAttempt(templateJob, newAttempt)
|
||||
if plan.rerunJobIDs.Contains(templateJob.JobID) {
|
||||
|
||||
// Remap ParentJobID from template attempts's DB ID -> new attempt's DB ID.
|
||||
if templateJob.ParentJobID != 0 {
|
||||
newParentID, ok := templateIDToNewID[templateJob.ParentJobID]
|
||||
if !ok {
|
||||
return fmt.Errorf("clone order violation: parent job %d not yet cloned for child %d",
|
||||
templateJob.ParentJobID, templateJob.ID)
|
||||
}
|
||||
newJob.ParentJobID = newParentID
|
||||
}
|
||||
|
||||
if plan.rerunAttemptJobIDs.Contains(templateJob.AttemptJobID) {
|
||||
shouldBlockJob := shouldBlock || plan.hasRerunDependency(templateJob)
|
||||
|
||||
newJob.Status = util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting)
|
||||
@@ -227,6 +262,11 @@ func execRerunPlan(ctx context.Context, plan *rerunPlan) (*actions_model.ActionR
|
||||
newJob.ConcurrencyCancel = false
|
||||
newJob.IsConcurrencyEvaluated = false
|
||||
|
||||
if templateJob.IsReusableCaller {
|
||||
newJob.IsExpanded = false
|
||||
newJob.CallPayload = ""
|
||||
}
|
||||
|
||||
if newJob.RawConcurrency != "" && !shouldBlockJob {
|
||||
if err := EvaluateJobConcurrencyFillModel(ctx, plan.run, newAttempt, newJob, vars, nil); err != nil {
|
||||
return fmt.Errorf("evaluate job concurrency: %w", err)
|
||||
@@ -242,17 +282,45 @@ func execRerunPlan(ctx context.Context, plan *rerunPlan) (*actions_model.ActionR
|
||||
} else {
|
||||
newJob.TaskID = 0
|
||||
newJob.SourceTaskID = templateJob.EffectiveTaskID()
|
||||
newJob.Started = templateJob.Started
|
||||
newJob.Stopped = templateJob.Stopped
|
||||
|
||||
isAncestor := plan.ancestorAttemptJobIDs.Contains(templateJob.AttemptJobID)
|
||||
newJob.Started = util.Iif(isAncestor, 0, templateJob.Started)
|
||||
newJob.Stopped = util.Iif(isAncestor, 0, templateJob.Stopped)
|
||||
}
|
||||
|
||||
if err := db.Insert(ctx, newJob); err != nil {
|
||||
return err
|
||||
}
|
||||
hasWaitingJobs = hasWaitingJobs || newJob.Status == actions_model.StatusWaiting
|
||||
templateIDToNewID[templateJob.ID] = newJob.ID
|
||||
|
||||
// expand reusable caller
|
||||
if newJob.IsReusableCaller && newJob.Status == actions_model.StatusWaiting && !newJob.IsExpanded {
|
||||
if err := expandReusableWorkflowCaller(ctx, plan.run, newAttempt, newJob, vars); err != nil {
|
||||
return fmt.Errorf("inline trigger caller %d ready: %w", newJob.ID, err)
|
||||
}
|
||||
// refresh the caller status
|
||||
if err := actions_model.RefreshReusableCallerStatus(ctx, newJob); err != nil {
|
||||
return fmt.Errorf("refresh caller %d status: %w", newJob.ID, err)
|
||||
}
|
||||
hasWaitingCallerJobs = true
|
||||
}
|
||||
|
||||
// A reusable caller is never dispatched to a runner, so it must not drive the task-version bump.
|
||||
hasWaitingJobs = hasWaitingJobs || (newJob.Status == actions_model.StatusWaiting && !newJob.IsReusableCaller)
|
||||
newJobs = append(newJobs, newJob)
|
||||
}
|
||||
|
||||
// Refresh each ancestor's status from its now-fresh children.
|
||||
// `newJobs` is appended top-down (caller before its children), so we walk it in reverse to refresh the deepest ancestor first.
|
||||
for _, ancestor := range slices.Backward(newJobs) {
|
||||
if !ancestor.IsReusableCaller || !plan.ancestorAttemptJobIDs.Contains(ancestor.AttemptJobID) {
|
||||
continue
|
||||
}
|
||||
if err := actions_model.RefreshReusableCallerStatus(ctx, ancestor); err != nil {
|
||||
return fmt.Errorf("refresh ancestor caller %d status: %w", ancestor.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
newAttempt.Status = actions_model.AggregateJobStatus(newJobsToRerun)
|
||||
if err := actions_model.UpdateRunAttempt(ctx, newAttempt, "status"); err != nil {
|
||||
return err
|
||||
@@ -280,60 +348,149 @@ func execRerunPlan(ctx context.Context, plan *rerunPlan) (*actions_model.ActionR
|
||||
CreateCommitStatusForRunJobs(ctx, plan.run, newJobs...)
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, newJobsToRerun)
|
||||
|
||||
// Post-commit kick for expanded callers: let job_emitter resolve its child jobs
|
||||
if hasWaitingCallerJobs {
|
||||
if err := EmitJobsIfReadyByRun(plan.run.ID); err != nil {
|
||||
log.Error("emit run %d after rerun: %v", plan.run.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return newAttempt, nil
|
||||
}
|
||||
|
||||
// expandRerunJobIDs computes rerunAttemptJobIDs and ancestorAttemptJobIDs from the user-selected jobsToRerun.
|
||||
func (p *rerunPlan) expandRerunJobIDs(jobsToRerun []*actions_model.ActionRunJob) error {
|
||||
templateJobIDs := make(container.Set[string])
|
||||
for _, job := range p.templateJobs {
|
||||
templateJobIDs.Add(job.JobID)
|
||||
}
|
||||
|
||||
// Empty jobsToRerun: rerun the whole latest attempt
|
||||
if len(jobsToRerun) == 0 {
|
||||
p.rerunJobIDs = templateJobIDs
|
||||
all := make(container.Set[int64], len(p.templateJobs))
|
||||
for _, job := range p.templateJobs {
|
||||
all.Add(job.AttemptJobID)
|
||||
}
|
||||
p.rerunAttemptJobIDs = all
|
||||
p.ancestorAttemptJobIDs = make(container.Set[int64])
|
||||
return nil
|
||||
}
|
||||
|
||||
rerunJobIDs := make(container.Set[string])
|
||||
byID := make(map[int64]*actions_model.ActionRunJob, len(p.templateJobs))
|
||||
byAttemptJobID := make(map[int64]*actions_model.ActionRunJob, len(p.templateJobs))
|
||||
for _, job := range p.templateJobs {
|
||||
byID[job.ID] = job
|
||||
byAttemptJobID[job.AttemptJobID] = job
|
||||
}
|
||||
|
||||
for _, job := range jobsToRerun {
|
||||
if !templateJobIDs.Contains(job.JobID) {
|
||||
if _, ok := byID[job.ID]; !ok {
|
||||
return util.NewInvalidArgumentErrorf("job %q does not exist in the latest attempt", job.JobID)
|
||||
}
|
||||
rerunJobIDs.Add(job.JobID)
|
||||
}
|
||||
|
||||
for {
|
||||
found := false
|
||||
for _, job := range p.templateJobs {
|
||||
if rerunJobIDs.Contains(job.JobID) {
|
||||
rerunSet := make(container.Set[int64])
|
||||
ancestorSet := make(container.Set[int64])
|
||||
queue := make([]*actions_model.ActionRunJob, 0, len(jobsToRerun))
|
||||
|
||||
for _, job := range jobsToRerun {
|
||||
j := byID[job.ID]
|
||||
rerunSet.Add(j.AttemptJobID)
|
||||
queue = append(queue, j)
|
||||
}
|
||||
|
||||
for len(queue) > 0 {
|
||||
cur := queue[0]
|
||||
queue = queue[1:]
|
||||
|
||||
// same-scope downstream: siblings whose Needs reference cur.JobID join the rerun set
|
||||
for _, candidate := range p.templateJobs {
|
||||
if candidate.ParentJobID != cur.ParentJobID {
|
||||
continue
|
||||
}
|
||||
for _, need := range job.Needs {
|
||||
if rerunJobIDs.Contains(need) {
|
||||
found = true
|
||||
rerunJobIDs.Add(job.JobID)
|
||||
break
|
||||
}
|
||||
if rerunSet.Contains(candidate.AttemptJobID) || ancestorSet.Contains(candidate.AttemptJobID) {
|
||||
continue
|
||||
}
|
||||
if !slices.Contains(candidate.Needs, cur.JobID) {
|
||||
continue
|
||||
}
|
||||
rerunSet.Add(candidate.AttemptJobID)
|
||||
queue = append(queue, candidate)
|
||||
}
|
||||
if !found {
|
||||
break
|
||||
|
||||
// escalate to parent caller as an ancestor so its own siblings get checked next round
|
||||
if cur.ParentJobID == 0 {
|
||||
continue
|
||||
}
|
||||
parent, ok := byID[cur.ParentJobID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if rerunSet.Contains(parent.AttemptJobID) || ancestorSet.Contains(parent.AttemptJobID) {
|
||||
continue
|
||||
}
|
||||
ancestorSet.Add(parent.AttemptJobID)
|
||||
queue = append(queue, parent)
|
||||
}
|
||||
|
||||
// remove entries whose parent-caller chain already has a rerunSet member
|
||||
for atID := range ancestorSet {
|
||||
cur := byAttemptJobID[atID]
|
||||
for cur.ParentJobID != 0 {
|
||||
parent, ok := byID[cur.ParentJobID]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if rerunSet.Contains(parent.AttemptJobID) {
|
||||
delete(ancestorSet, atID)
|
||||
break
|
||||
}
|
||||
cur = parent
|
||||
}
|
||||
}
|
||||
|
||||
p.rerunJobIDs = rerunJobIDs
|
||||
p.rerunAttemptJobIDs = rerunSet
|
||||
p.ancestorAttemptJobIDs = ancestorSet
|
||||
return nil
|
||||
}
|
||||
|
||||
// hasRerunDependency reports whether `job` has a needs-reference that points to a job which is itself being rerun (in rerunAttemptJobIDs)
|
||||
// or is an ancestor caller whose subtree is being rerun (in ancestorAttemptJobIDs).
|
||||
// Either case means `job` should start in Blocked status.
|
||||
func (p *rerunPlan) hasRerunDependency(job *actions_model.ActionRunJob) bool {
|
||||
for _, need := range job.Needs {
|
||||
if p.rerunJobIDs.Contains(need) {
|
||||
if len(job.Needs) == 0 {
|
||||
return false
|
||||
}
|
||||
needSet := container.SetOf(job.Needs...)
|
||||
for _, sibling := range p.templateJobs {
|
||||
if sibling.ParentJobID != job.ParentJobID {
|
||||
continue
|
||||
}
|
||||
if !needSet.Contains(sibling.JobID) {
|
||||
continue
|
||||
}
|
||||
if p.rerunAttemptJobIDs.Contains(sibling.AttemptJobID) || p.ancestorAttemptJobIDs.Contains(sibling.AttemptJobID) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// collectResetCallerDescendants walks p.templateJobs and returns the DB IDs of every transitive descendant of any reusable caller whose AttemptJobID is in p.rerunAttemptJobIDs.
|
||||
// These descendants must NOT be cloned by execRerunPlan: the reset caller will re-insert them with template-matched AttemptJobIDs.
|
||||
func (p *rerunPlan) collectResetCallerDescendants() container.Set[int64] {
|
||||
out := make(container.Set[int64])
|
||||
for _, tj := range p.templateJobs {
|
||||
if !tj.IsReusableCaller || !p.rerunAttemptJobIDs.Contains(tj.AttemptJobID) {
|
||||
continue
|
||||
}
|
||||
// If this caller's row ID is already in `out`, it means an outer caller has already covered its whole subtree.
|
||||
// Skip the redundant walk.
|
||||
if out.Contains(tj.ID) {
|
||||
continue
|
||||
}
|
||||
for _, child := range actions_model.CollectAllDescendantJobs(tj, p.templateJobs) {
|
||||
out.Add(child.ID)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func cloneRunJobForAttempt(templateJob *actions_model.ActionRunJob, attempt *actions_model.ActionRunAttempt) *actions_model.ActionRunJob {
|
||||
return &actions_model.ActionRunJob{
|
||||
RunID: templateJob.RunID,
|
||||
@@ -355,6 +512,17 @@ func cloneRunJobForAttempt(templateJob *actions_model.ActionRunJob, attempt *act
|
||||
ConcurrencyGroup: templateJob.ConcurrencyGroup,
|
||||
ConcurrencyCancel: templateJob.ConcurrencyCancel,
|
||||
TokenPermissions: templateJob.TokenPermissions,
|
||||
|
||||
// reusable workflow fields
|
||||
IsReusableCaller: templateJob.IsReusableCaller,
|
||||
CallUses: templateJob.CallUses,
|
||||
ReusableWorkflowContent: slices.Clone(templateJob.ReusableWorkflowContent),
|
||||
CallSecrets: templateJob.CallSecrets,
|
||||
CallPayload: templateJob.CallPayload,
|
||||
IsExpanded: templateJob.IsExpanded,
|
||||
ParentJobID: templateJob.ParentJobID, // remapped by execRerunPlan
|
||||
WorkflowSourceRepoID: templateJob.WorkflowSourceRepoID,
|
||||
WorkflowSourceCommitSHA: templateJob.WorkflowSourceCommitSHA,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
user_model "gitea.dev/models/user"
|
||||
"gitea.dev/modules/container"
|
||||
"gitea.dev/modules/util"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -100,3 +101,242 @@ func TestRerunValidation(t *testing.T) {
|
||||
assert.ErrorIs(t, err, util.ErrInvalidArgument)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRerunPlan(t *testing.T) {
|
||||
// "verify" appears in two scopes (inner caller under deploy, and top-level) so scope-blind matching would fail here.
|
||||
|
||||
// build id=101, attemptJobID=1
|
||||
// test id=102, attemptJobID=2, needs=[build]
|
||||
// deploy id=103, attemptJobID=3, caller
|
||||
// ├── validate id=104, attemptJobID=4, parent=103
|
||||
// ├── push id=105, attemptJobID=5, parent=103, needs=[validate]
|
||||
// ├── verify id=106, attemptJobID=6, parent=103, caller, needs=[push]
|
||||
// │ ├── smoke-test id=107, attemptJobID=7, parent=106
|
||||
// │ └── cleanup id=108, attemptJobID=8, parent=106, needs=[smoke-test]
|
||||
// └── finish-deploy id=109, attemptJobID=9, parent=103, needs=[verify]
|
||||
// verify id=110, attemptJobID=10, needs=[deploy] (top-level, same JobID)
|
||||
|
||||
buildJob := templateJob(101, 1, "build", 0, false)
|
||||
testJob := templateJob(102, 2, "test", 0, false, "build")
|
||||
deployJob := templateJob(103, 3, "deploy", 0, true)
|
||||
validateJob := templateJob(104, 4, "validate", 103, false)
|
||||
pushJob := templateJob(105, 5, "push", 103, false, "validate")
|
||||
verifyInnerJob := templateJob(106, 6, "verify", 103, true, "push")
|
||||
smokeTestJob := templateJob(107, 7, "smoke-test", 106, false)
|
||||
cleanupJob := templateJob(108, 8, "cleanup", 106, false, "smoke-test")
|
||||
finishDeployJob := templateJob(109, 9, "finish-deploy", 103, false, "verify")
|
||||
verifyTopJob := templateJob(110, 10, "verify", 0, false, "deploy")
|
||||
|
||||
jobs := []*actions_model.ActionRunJob{
|
||||
buildJob, testJob, deployJob, validateJob, pushJob,
|
||||
verifyInnerJob, smokeTestJob, cleanupJob,
|
||||
finishDeployJob, verifyTopJob,
|
||||
}
|
||||
|
||||
t.Run("ExpandRerunJobIDs", func(t *testing.T) {
|
||||
t.Run("empty jobsToRerun reruns every template job, no ancestors", func(t *testing.T) {
|
||||
plan := &rerunPlan{templateJobs: jobs}
|
||||
require.NoError(t, plan.expandRerunJobIDs(nil))
|
||||
|
||||
assert.ElementsMatch(t, attemptJobIDsOf(jobs...), plan.rerunAttemptJobIDs.Values())
|
||||
assert.Empty(t, plan.ancestorAttemptJobIDs)
|
||||
})
|
||||
|
||||
t.Run("same-scope downstream BFS pulls in dependents", func(t *testing.T) {
|
||||
// a -> b -> c (chain), d unrelated.
|
||||
a := templateJob(101, 1, "a", 0, false)
|
||||
b := templateJob(102, 2, "b", 0, false, "a")
|
||||
c := templateJob(103, 3, "c", 0, false, "b")
|
||||
d := templateJob(104, 4, "d", 0, false)
|
||||
plan := &rerunPlan{templateJobs: []*actions_model.ActionRunJob{a, b, c, d}}
|
||||
require.NoError(t, plan.expandRerunJobIDs([]*actions_model.ActionRunJob{a}))
|
||||
|
||||
assert.ElementsMatch(t, attemptJobIDsOf(a, b, c), plan.rerunAttemptJobIDs.Values())
|
||||
assert.Empty(t, plan.ancestorAttemptJobIDs)
|
||||
})
|
||||
|
||||
t.Run("rerun a deep child escalates across reusable scopes", func(t *testing.T) {
|
||||
plan := &rerunPlan{templateJobs: jobs}
|
||||
require.NoError(t, plan.expandRerunJobIDs([]*actions_model.ActionRunJob{smokeTestJob}))
|
||||
|
||||
// rerun: smoke-test (selected), cleanup (same-scope downstream),
|
||||
// finish-deploy (deploy-scope sibling of inner verify ancestor),
|
||||
// top-level verify (top-scope sibling of deploy ancestor).
|
||||
assert.ElementsMatch(t,
|
||||
attemptJobIDsOf(smokeTestJob, cleanupJob, finishDeployJob, verifyTopJob),
|
||||
plan.rerunAttemptJobIDs.Values())
|
||||
|
||||
// ancestors: inner verify and deploy
|
||||
assert.ElementsMatch(t, attemptJobIDsOf(verifyInnerJob, deployJob), plan.ancestorAttemptJobIDs.Values())
|
||||
})
|
||||
|
||||
t.Run("rerun a top-level caller resets only itself and same-scope dependents", func(t *testing.T) {
|
||||
plan := &rerunPlan{templateJobs: jobs}
|
||||
require.NoError(t, plan.expandRerunJobIDs([]*actions_model.ActionRunJob{deployJob}))
|
||||
|
||||
// rerun: deploy (selected) + top-level verify (needs:[deploy]).
|
||||
assert.ElementsMatch(t, attemptJobIDsOf(deployJob, verifyTopJob), plan.rerunAttemptJobIDs.Values())
|
||||
// deploy is top-level so no ancestors.
|
||||
assert.Empty(t, plan.ancestorAttemptJobIDs)
|
||||
})
|
||||
|
||||
t.Run("rerun a nested caller escalates one level", func(t *testing.T) {
|
||||
plan := &rerunPlan{templateJobs: jobs}
|
||||
require.NoError(t, plan.expandRerunJobIDs([]*actions_model.ActionRunJob{verifyInnerJob}))
|
||||
|
||||
// inner verify (selected) -> finish-deploy (deploy-scope dep) -> top-level verify (top-scope dep of deploy).
|
||||
assert.ElementsMatch(t,
|
||||
attemptJobIDsOf(verifyInnerJob, finishDeployJob, verifyTopJob),
|
||||
plan.rerunAttemptJobIDs.Values())
|
||||
// deploy is the only ancestor (one level up from inner verify).
|
||||
assert.ElementsMatch(t, attemptJobIDsOf(deployJob), plan.ancestorAttemptJobIDs.Values())
|
||||
})
|
||||
|
||||
t.Run("selecting one same-name job leaves the other-scope same-name job alone", func(t *testing.T) {
|
||||
// Selecting the top-level "verify" must not pull in the same-named inner one or its descendants.
|
||||
plan := &rerunPlan{templateJobs: jobs}
|
||||
require.NoError(t, plan.expandRerunJobIDs([]*actions_model.ActionRunJob{verifyTopJob}))
|
||||
|
||||
// Only the top-level verify is rerun.
|
||||
assert.ElementsMatch(t, attemptJobIDsOf(verifyTopJob), plan.rerunAttemptJobIDs.Values())
|
||||
assert.Empty(t, plan.ancestorAttemptJobIDs)
|
||||
})
|
||||
|
||||
t.Run("a caller is rerun when a sibling it needs is selected", func(t *testing.T) {
|
||||
plan := &rerunPlan{templateJobs: jobs}
|
||||
require.NoError(t, plan.expandRerunJobIDs([]*actions_model.ActionRunJob{pushJob}))
|
||||
|
||||
assert.ElementsMatch(t,
|
||||
attemptJobIDsOf(pushJob, verifyInnerJob, finishDeployJob, verifyTopJob),
|
||||
plan.rerunAttemptJobIDs.Values())
|
||||
assert.ElementsMatch(t, attemptJobIDsOf(deployJob), plan.ancestorAttemptJobIDs.Values())
|
||||
|
||||
// Confirm the downstream effect: verify(inner) is a reset caller, so its children's DB row IDs are marked for skip-clone.
|
||||
assert.ElementsMatch(t, rowIDsOf(smokeTestJob, cleanupJob), plan.collectResetCallerDescendants().Values())
|
||||
})
|
||||
|
||||
t.Run("multiple selections converge", func(t *testing.T) {
|
||||
plan := &rerunPlan{templateJobs: jobs}
|
||||
require.NoError(t, plan.expandRerunJobIDs([]*actions_model.ActionRunJob{deployJob, smokeTestJob}))
|
||||
|
||||
assert.ElementsMatch(t, attemptJobIDsOf(deployJob, smokeTestJob, cleanupJob, finishDeployJob, verifyTopJob), plan.rerunAttemptJobIDs.Values())
|
||||
assert.Empty(t, plan.ancestorAttemptJobIDs)
|
||||
assert.ElementsMatch(t,
|
||||
rowIDsOf(validateJob, pushJob, verifyInnerJob, smokeTestJob, cleanupJob, finishDeployJob),
|
||||
plan.collectResetCallerDescendants().Values())
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("CollectResetCallerDescendants", func(t *testing.T) {
|
||||
planWith := func(rerunJobs ...*actions_model.ActionRunJob) *rerunPlan {
|
||||
set := make(container.Set[int64])
|
||||
for _, j := range rerunJobs {
|
||||
set.Add(j.AttemptJobID)
|
||||
}
|
||||
return &rerunPlan{templateJobs: jobs, rerunAttemptJobIDs: set}
|
||||
}
|
||||
|
||||
t.Run("non-caller in reset set is ignored", func(t *testing.T) {
|
||||
assert.Empty(t, planWith(smokeTestJob).collectResetCallerDescendants())
|
||||
})
|
||||
|
||||
t.Run("caller in reset set returns transitive descendants", func(t *testing.T) {
|
||||
out := planWith(deployJob).collectResetCallerDescendants()
|
||||
assert.ElementsMatch(t,
|
||||
rowIDsOf(validateJob, pushJob, verifyInnerJob, smokeTestJob, cleanupJob, finishDeployJob),
|
||||
out.Values())
|
||||
})
|
||||
|
||||
t.Run("multiple reset callers union their descendants", func(t *testing.T) {
|
||||
out := planWith(deployJob, verifyInnerJob).collectResetCallerDescendants()
|
||||
assert.ElementsMatch(t,
|
||||
rowIDsOf(validateJob, pushJob, verifyInnerJob, smokeTestJob, cleanupJob, finishDeployJob),
|
||||
out.Values())
|
||||
})
|
||||
|
||||
t.Run("nested-only reset returns just the nested subtree", func(t *testing.T) {
|
||||
out := planWith(verifyInnerJob).collectResetCallerDescendants()
|
||||
assert.ElementsMatch(t, rowIDsOf(smokeTestJob, cleanupJob), out.Values())
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("HasRerunDependency", func(t *testing.T) {
|
||||
t.Run("no needs returns false", func(t *testing.T) {
|
||||
plan := &rerunPlan{
|
||||
templateJobs: []*actions_model.ActionRunJob{buildJob},
|
||||
rerunAttemptJobIDs: make(container.Set[int64]),
|
||||
ancestorAttemptJobIDs: make(container.Set[int64]),
|
||||
}
|
||||
assert.False(t, plan.hasRerunDependency(buildJob))
|
||||
})
|
||||
|
||||
t.Run("dependency in rerun set returns true", func(t *testing.T) {
|
||||
plan := &rerunPlan{
|
||||
templateJobs: jobs,
|
||||
rerunAttemptJobIDs: container.SetOf(smokeTestJob.AttemptJobID),
|
||||
ancestorAttemptJobIDs: make(container.Set[int64]),
|
||||
}
|
||||
// cleanup `needs: [smoke-test]`, both in inner verify scope.
|
||||
assert.True(t, plan.hasRerunDependency(cleanupJob))
|
||||
})
|
||||
|
||||
t.Run("dependency in ancestor set returns true", func(t *testing.T) {
|
||||
plan := &rerunPlan{
|
||||
templateJobs: jobs,
|
||||
rerunAttemptJobIDs: container.SetOf(attemptJobIDsOf(smokeTestJob, cleanupJob)...),
|
||||
ancestorAttemptJobIDs: container.SetOf(verifyInnerJob.AttemptJobID),
|
||||
}
|
||||
assert.True(t, plan.hasRerunDependency(finishDeployJob))
|
||||
})
|
||||
|
||||
t.Run("dependency on unrelated sibling returns false", func(t *testing.T) {
|
||||
plan := &rerunPlan{
|
||||
templateJobs: jobs,
|
||||
rerunAttemptJobIDs: container.SetOf(smokeTestJob.AttemptJobID),
|
||||
ancestorAttemptJobIDs: make(container.Set[int64]),
|
||||
}
|
||||
assert.False(t, plan.hasRerunDependency(pushJob))
|
||||
})
|
||||
|
||||
t.Run("scope-bound: same JobID in another scope does not match", func(t *testing.T) {
|
||||
plan := &rerunPlan{
|
||||
templateJobs: jobs,
|
||||
rerunAttemptJobIDs: container.SetOf(verifyTopJob.AttemptJobID),
|
||||
ancestorAttemptJobIDs: make(container.Set[int64]),
|
||||
}
|
||||
assert.False(t, plan.hasRerunDependency(finishDeployJob))
|
||||
|
||||
// Sanity: swap to the inner verify and the same target now sees it.
|
||||
plan.rerunAttemptJobIDs = container.SetOf(verifyInnerJob.AttemptJobID)
|
||||
assert.True(t, plan.hasRerunDependency(finishDeployJob))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// templateJob is a small constructor for fixture jobs used by the rerunPlan unit tests.
|
||||
func templateJob(id, attemptJobID int64, jobID string, parentID int64, isCaller bool, needs ...string) *actions_model.ActionRunJob {
|
||||
return &actions_model.ActionRunJob{
|
||||
ID: id,
|
||||
AttemptJobID: attemptJobID,
|
||||
JobID: jobID,
|
||||
ParentJobID: parentID,
|
||||
IsReusableCaller: isCaller,
|
||||
Needs: needs,
|
||||
}
|
||||
}
|
||||
|
||||
func attemptJobIDsOf(jobs ...*actions_model.ActionRunJob) []int64 {
|
||||
out := make([]int64, len(jobs))
|
||||
for i, j := range jobs {
|
||||
out[i] = j.AttemptJobID
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func rowIDsOf(jobs ...*actions_model.ActionRunJob) []int64 {
|
||||
out := make([]int64, len(jobs))
|
||||
for i, j := range jobs {
|
||||
out[i] = j.ID
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -0,0 +1,342 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/models/db"
|
||||
perm_model "gitea.dev/models/perm"
|
||||
access_model "gitea.dev/models/perm/access"
|
||||
repo_model "gitea.dev/models/repo"
|
||||
"gitea.dev/modules/actions/jobparser"
|
||||
"gitea.dev/modules/container"
|
||||
"gitea.dev/modules/gitrepo"
|
||||
"gitea.dev/modules/json"
|
||||
api "gitea.dev/modules/structs"
|
||||
"gitea.dev/modules/util"
|
||||
"gitea.dev/services/convert"
|
||||
|
||||
"xorm.io/builder"
|
||||
)
|
||||
|
||||
// MaxReusableCallLevels caps how deep a reusable workflow can nest:
|
||||
// a top-level caller may have at most MaxReusableCallLevels nested callers below it.
|
||||
const MaxReusableCallLevels = 9
|
||||
|
||||
// loadReusableWorkflowSource resolves the workflow file referenced by a caller's `uses:` and returns its raw bytes,
|
||||
// along with the (repo_id, commit_sha) the file was loaded from.
|
||||
func loadReusableWorkflowSource(ctx context.Context, run *actions_model.ActionRun, caller *actions_model.ActionRunJob, ref *jobparser.UsesRef) (content []byte, sourceRepoID int64, sourceCommitSHA string, err error) {
|
||||
if err := run.LoadAttributes(ctx); err != nil {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
|
||||
switch ref.Kind {
|
||||
case jobparser.UsesKindLocalSameRepo:
|
||||
// `./` is resolved against the workflow file containing the `uses:` - i.e. the caller's own source repo + commit.
|
||||
callerRepo, err := repo_model.GetRepositoryByID(ctx, caller.WorkflowSourceRepoID)
|
||||
if err != nil {
|
||||
return nil, 0, "", fmt.Errorf("look up caller source repo %d: %w", caller.WorkflowSourceRepoID, err)
|
||||
}
|
||||
bytes, resolvedSHA, err := readWorkflowFromRepo(ctx, callerRepo, caller.WorkflowSourceCommitSHA, ref.Path)
|
||||
if err != nil {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
return bytes, callerRepo.ID, resolvedSHA, nil
|
||||
|
||||
case jobparser.UsesKindLocalCrossRepo:
|
||||
repo, err := repo_model.GetRepositoryByOwnerAndName(ctx, ref.Owner, ref.Repo)
|
||||
if err != nil {
|
||||
return nil, 0, "", fmt.Errorf("look up cross-repo workflow source %q: %w", ref.Owner+"/"+ref.Repo, err)
|
||||
}
|
||||
ok, err := access_model.CanReadWorkflowCrossRepo(ctx, repo, run)
|
||||
if err != nil {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
if !ok {
|
||||
return nil, 0, "", fmt.Errorf("no permission to read reusable workflow from %s/%s", ref.Owner, ref.Repo)
|
||||
}
|
||||
bytes, resolvedSHA, err := readWorkflowFromRepo(ctx, repo, ref.Ref, ref.Path)
|
||||
if err != nil {
|
||||
return nil, 0, "", err
|
||||
}
|
||||
return bytes, repo.ID, resolvedSHA, nil
|
||||
}
|
||||
return nil, 0, "", fmt.Errorf("unsupported uses kind %d", ref.Kind)
|
||||
}
|
||||
|
||||
// readWorkflowFromRepo loads a workflow file from `repo` at `refOrSHA` and returns its content plus the resolved commit SHA.
|
||||
func readWorkflowFromRepo(ctx context.Context, repo *repo_model.Repository, refOrSHA, path string) ([]byte, string, error) {
|
||||
gitRepo, err := gitrepo.OpenRepository(ctx, repo)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("open repo %s: %w", repo.FullName(), err)
|
||||
}
|
||||
defer gitRepo.Close()
|
||||
|
||||
commit, err := gitRepo.GetCommit(refOrSHA)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("get commit %q in %s: %w", refOrSHA, repo.FullName(), err)
|
||||
}
|
||||
str, err := commit.GetFileContent(path, 1024*1024)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("read %s@%s:%s: %w", repo.FullName(), refOrSHA, path, err)
|
||||
}
|
||||
return []byte(str), commit.ID.String(), nil
|
||||
}
|
||||
|
||||
// checkCallerChain walks `caller`'s ancestor chain (via ParentJobID) and:
|
||||
// - rejects cycles (caller.CallUses appearing in any ancestor's CallUses)
|
||||
// - enforces MaxReusableCallLevels on the number of ancestors above `caller`
|
||||
//
|
||||
// Cycle detection is intentionally *syntactic* (string equality on CallUses), not semantic.
|
||||
// So `owner/repo/lib.yml@v1` and `owner/repo/lib.yml@refs/heads/v1` resolving to the same commit are NOT treated as the same node.
|
||||
// Going semantic (Owner, Repo, Path, ResolvedSHA tuples) would require extra git reads.
|
||||
func checkCallerChain(ctx context.Context, caller *actions_model.ActionRunJob) error {
|
||||
if caller.ParentJobID == 0 {
|
||||
return nil // top-level caller: depth 0, no ancestors to walk
|
||||
}
|
||||
|
||||
visited := make(container.Set[string])
|
||||
visited.Add(caller.CallUses)
|
||||
|
||||
depth := 0
|
||||
current := caller
|
||||
for current.ParentJobID != 0 {
|
||||
next, err := actions_model.GetRunJobByRunAndID(ctx, current.RunID, current.ParentJobID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("walk caller chain: %w", err)
|
||||
}
|
||||
current = next
|
||||
depth++
|
||||
if depth > MaxReusableCallLevels {
|
||||
return fmt.Errorf("reusable workflow call exceeds the maximum nesting level of %d at %q", MaxReusableCallLevels, caller.CallUses)
|
||||
}
|
||||
if current.IsReusableCaller && current.CallUses != "" {
|
||||
if visited.Contains(current.CallUses) {
|
||||
return fmt.Errorf("reusable workflow call cycle detected: %q", current.CallUses)
|
||||
}
|
||||
visited.Add(current.CallUses)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// expandReusableWorkflowCaller loads and parses the target reusable workflow and inserts the caller's direct child jobs.
|
||||
// It expands only ONE level: a child that is itself a reusable caller is inserted Blocked and expanded later by a subsequent resolver pass.
|
||||
// It does NOT schedule a follow-up resolver pass; the caller of this function is responsible for emitting.
|
||||
//
|
||||
// All call sites (PrepareRunAndInsert, execRerunPlan, checkJobsOfCurrentRunAttempt, ApproveRuns) invoke this inside their enclosing write transaction,
|
||||
// because the caller row update and the child-row inserts must commit atomically.
|
||||
// Be aware this is not cheap inside a tx: it does a git read, YAML parsing, and `${{ }}` expression evaluation.
|
||||
// None of the call sites is hot: each caller is expanded once per attempt.
|
||||
func expandReusableWorkflowCaller(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.ActionRunAttempt, caller *actions_model.ActionRunJob, vars map[string]string) error {
|
||||
// Already expanded by an earlier call, skip
|
||||
if caller.IsExpanded {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 1. Cycle + depth check via the ParentJobID chain.
|
||||
if err := checkCallerChain(ctx, caller); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. Parse the caller's own job (Uses, With, RawSecrets) from its WorkflowPayload.
|
||||
parsedJob, err := caller.ParseJob()
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse caller job %d: %w", caller.ID, err)
|
||||
}
|
||||
|
||||
// 3. Load called-workflow source.
|
||||
ref, err := jobparser.ParseUses(parsedJob.Uses)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse uses %q: %w", parsedJob.Uses, err)
|
||||
}
|
||||
content, contentSourceRepoID, contentSourceCommitSHA, err := loadReusableWorkflowSource(ctx, run, caller, ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 4. Parse the called workflow's spec (used by both secret validation and input evaluation).
|
||||
wcSpec, err := jobparser.ParseWorkflowCallSpec(content)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse called workflow spec: %w", err)
|
||||
}
|
||||
|
||||
// 5. Resolve caller's `secrets:` and validate it against the callee's schema.
|
||||
inherit, secretsMap, err := jobparser.ParseCallerSecrets(parsedJob.RawSecrets)
|
||||
if err != nil {
|
||||
return fmt.Errorf("caller secrets %q: %w", caller.JobID, err)
|
||||
}
|
||||
// Under `secrets: inherit` the caller forwards all of its own secrets verbatim and does NOT name them individually,
|
||||
// so required-secret presence cannot be verified at expansion time and a missing required secret will surface at job runtime.
|
||||
// This matches GitHub Actions' behavior.
|
||||
if !inherit {
|
||||
if err := jobparser.ValidateCallerSecrets(wcSpec, secretsMap); err != nil {
|
||||
return fmt.Errorf("caller %q secrets: %w", caller.JobID, err)
|
||||
}
|
||||
}
|
||||
switch {
|
||||
case inherit:
|
||||
caller.CallSecrets = jobparser.SecretsInherit
|
||||
case len(secretsMap) > 0:
|
||||
mapBytes, err := json.Marshal(secretsMap)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal caller secret map: %w", err)
|
||||
}
|
||||
caller.CallSecrets = string(mapBytes)
|
||||
}
|
||||
caller.ReusableWorkflowContent = content
|
||||
|
||||
// 6. Evaluate caller's `with:`, then match against the callee schema.
|
||||
workflowCallInputs := map[string]any{}
|
||||
if len(wcSpec.Inputs) > 0 {
|
||||
jobResults, err := findJobNeedsAndFillJobResults(ctx, caller)
|
||||
if err != nil {
|
||||
return fmt.Errorf("find caller needs: %w", err)
|
||||
}
|
||||
parentInputs, err := getInputsForJob(ctx, run, caller)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
callerGitCtx := GenerateGiteaContext(ctx, run, attempt, caller)
|
||||
evaluated, err := jobparser.EvaluateCallerWith(
|
||||
caller.JobID, parsedJob,
|
||||
callerGitCtx, jobResults, vars, parentInputs,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("evaluate caller with: %w", err)
|
||||
}
|
||||
workflowCallInputs, err = jobparser.MatchCallerInputsAgainstSpec(wcSpec, evaluated)
|
||||
if err != nil {
|
||||
return fmt.Errorf("caller %q inputs: %w", caller.JobID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 7. Build CallPayload (persisted in step 9).
|
||||
callPayload, err := (&api.WorkflowCallPayload{
|
||||
Workflow: run.WorkflowID,
|
||||
Ref: run.Ref,
|
||||
Repository: convert.ToRepo(ctx, run.Repo, access_model.Permission{AccessMode: perm_model.AccessModeNone}),
|
||||
Sender: convert.ToUserWithAccessMode(ctx, run.TriggerUser, perm_model.AccessModeNone),
|
||||
Inputs: workflowCallInputs,
|
||||
}).JSONPayload()
|
||||
if err != nil {
|
||||
return fmt.Errorf("build call payload: %w", err)
|
||||
}
|
||||
|
||||
// 8. Insert direct children of this caller.
|
||||
existingChildren, err := actions_model.GetDirectChildJobsByParent(ctx, caller)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get existing children of caller %d: %w", caller.ID, err)
|
||||
}
|
||||
if len(existingChildren) > 0 {
|
||||
// Should not happen - child jobs cannot be expanded before the caller gets ready
|
||||
return fmt.Errorf("invariant violation: caller %d has %d pre-existing children", caller.ID, len(existingChildren))
|
||||
}
|
||||
if err := insertCallerChildren(ctx, run, attempt, caller, content, contentSourceRepoID, contentSourceCommitSHA, vars, workflowCallInputs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 9. Update caller-related cols.
|
||||
caller.CallPayload = string(callPayload)
|
||||
caller.IsExpanded = true
|
||||
n, err := actions_model.UpdateRunJob(ctx, caller,
|
||||
builder.Eq{"is_expanded": false},
|
||||
"call_secrets", "reusable_workflow_content", "call_payload", "is_expanded")
|
||||
if err != nil {
|
||||
return fmt.Errorf("commit caller %d expansion: %w", caller.ID, err)
|
||||
}
|
||||
if n == 0 {
|
||||
return fmt.Errorf("caller %d already expanded by another writer", caller.ID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// insertCallerChildren parses the called workflow with the caller's resolved inputs and inserts each parsed job.
|
||||
func insertCallerChildren(ctx context.Context, run *actions_model.ActionRun, attempt *actions_model.ActionRunAttempt, caller *actions_model.ActionRunJob, content []byte, sourceRepoID int64, sourceCommitSHA string, vars map[string]string, inputs map[string]any) error {
|
||||
// Parse the called workflow with the caller's `inputs`
|
||||
gitCtx := GenerateGiteaContext(ctx, run, attempt, nil)
|
||||
if event, ok := gitCtx["event"].(map[string]any); ok {
|
||||
event["inputs"] = inputs
|
||||
}
|
||||
gitCtx["event_name"] = "workflow_call"
|
||||
|
||||
childWorkflows, err := jobparser.Parse(content,
|
||||
jobparser.WithVars(vars),
|
||||
jobparser.WithGitContext(gitCtx.ToGitHubContext()),
|
||||
jobparser.WithInputs(inputs),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse called workflow for caller %d: %w", caller.ID, err)
|
||||
}
|
||||
if len(childWorkflows) == 0 {
|
||||
return fmt.Errorf("called workflow for caller %d (uses %q) has no jobs", caller.ID, caller.CallUses)
|
||||
}
|
||||
|
||||
priorChildren, err := actions_model.GetPriorAttemptChildrenByParent(ctx, run.ID, attempt.ID, caller.AttemptJobID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("lookup prior-attempt children of caller %d: %w", caller.ID, err)
|
||||
}
|
||||
|
||||
for _, sw := range childWorkflows {
|
||||
jobID, parsedChild := sw.Job()
|
||||
if parsedChild == nil {
|
||||
continue
|
||||
}
|
||||
needs := parsedChild.Needs()
|
||||
if err := sw.SetJob(jobID, parsedChild.EraseNeeds()); err != nil {
|
||||
return err
|
||||
}
|
||||
payload, err := sw.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal child %q under caller %d: %w", jobID, caller.ID, err)
|
||||
}
|
||||
|
||||
parsedChild.Name = util.EllipsisDisplayString(parsedChild.Name, 255)
|
||||
|
||||
// AttemptJobID: prefer a prior-attempt match by (JobID, Name) and fall back to a fresh allocator value for newly-appearing logical jobs.
|
||||
// The two-level key disambiguates matrix instances (same JobID, different Names) and distinct jobs that legally share the same Name (different JobIDs).
|
||||
var attemptJobID int64
|
||||
if priorChild, ok := priorChildren[jobID][parsedChild.Name]; ok {
|
||||
attemptJobID = priorChild.AttemptJobID
|
||||
} else {
|
||||
attemptJobID, err = actions_model.GetNextAttemptJobID(ctx, run.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("alloc attempt_job_id for child %q: %w", jobID, err)
|
||||
}
|
||||
}
|
||||
child := &actions_model.ActionRunJob{
|
||||
RunID: run.ID,
|
||||
RunAttemptID: attempt.ID,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: run.CommitSHA,
|
||||
IsForkPullRequest: run.IsForkPullRequest,
|
||||
Name: parsedChild.Name,
|
||||
Attempt: attempt.Attempt,
|
||||
WorkflowPayload: payload,
|
||||
JobID: jobID,
|
||||
AttemptJobID: attemptJobID,
|
||||
Needs: needs,
|
||||
RunsOn: parsedChild.RunsOn(),
|
||||
Status: actions_model.StatusBlocked,
|
||||
ParentJobID: caller.ID,
|
||||
WorkflowSourceRepoID: sourceRepoID,
|
||||
WorkflowSourceCommitSHA: sourceCommitSHA,
|
||||
}
|
||||
if perms := ExtractJobPermissionsFromWorkflow(sw, parsedChild); perms != nil {
|
||||
child.TokenPermissions = perms
|
||||
}
|
||||
if parsedChild.Uses != "" {
|
||||
child.IsReusableCaller = true
|
||||
child.CallUses = parsedChild.Uses
|
||||
}
|
||||
if err := db.Insert(ctx, child); err != nil {
|
||||
return fmt.Errorf("insert child %q under caller %d: %w", jobID, caller.ID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/models/db"
|
||||
"gitea.dev/models/unittest"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCheckCallerChain_Cycle(t *testing.T) {
|
||||
t.Run("DirectCycle", func(t *testing.T) {
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
// A -> A: leaf's CallUses matches its direct parent's.
|
||||
chain := buildCallerChain(t,
|
||||
"./.gitea/workflows/a.yml",
|
||||
"./.gitea/workflows/a.yml",
|
||||
)
|
||||
err := checkCallerChain(t.Context(), chain[len(chain)-1])
|
||||
assert.ErrorContains(t, err, "cycle detected")
|
||||
})
|
||||
|
||||
t.Run("IndirectCycle", func(t *testing.T) {
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
// A -> B -> A: leaf's CallUses matches its grandparent's.
|
||||
chain := buildCallerChain(t,
|
||||
"./.gitea/workflows/a.yml",
|
||||
"./.gitea/workflows/b.yml",
|
||||
"./.gitea/workflows/a.yml",
|
||||
)
|
||||
err := checkCallerChain(t.Context(), chain[len(chain)-1])
|
||||
assert.ErrorContains(t, err, "cycle detected")
|
||||
})
|
||||
|
||||
t.Run("NoCycle", func(t *testing.T) {
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
// Sanity: linear chain with distinct CallUses must not trip cycle detection.
|
||||
chain := buildCallerChain(t,
|
||||
"./.gitea/workflows/a.yml",
|
||||
"./.gitea/workflows/b.yml",
|
||||
"./.gitea/workflows/c.yml",
|
||||
)
|
||||
require.NoError(t, checkCallerChain(t.Context(), chain[len(chain)-1]))
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckCallerChain_DepthLimit(t *testing.T) {
|
||||
// top + MaxReusableCallLevels nested callers is the longest accepted; one more exceeds the limit.
|
||||
makeDistinctUses := func(n int) []string {
|
||||
out := make([]string, n)
|
||||
for i := range out {
|
||||
out[i] = fmt.Sprintf("./.gitea/workflows/level%d.yml", i)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
t.Run("ExactlyAtLimit", func(t *testing.T) {
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
chain := buildCallerChain(t, makeDistinctUses(MaxReusableCallLevels+1)...)
|
||||
require.NoError(t, checkCallerChain(t.Context(), chain[len(chain)-1]))
|
||||
})
|
||||
|
||||
t.Run("OneOverLimit", func(t *testing.T) {
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
chain := buildCallerChain(t, makeDistinctUses(MaxReusableCallLevels+2)...)
|
||||
err := checkCallerChain(t.Context(), chain[len(chain)-1])
|
||||
assert.ErrorContains(t, err, "exceeds the maximum nesting level")
|
||||
})
|
||||
}
|
||||
|
||||
// buildCallerChain inserts a linear chain of reusable caller jobs in a single run+attempt.
|
||||
// callerUses[0] is the top-level caller (ParentJobID=0); each subsequent caller is inserted as a child of the previous one.
|
||||
// Returns the inserted jobs in order (index 0 = top, last = leaf).
|
||||
func buildCallerChain(t *testing.T, callerUses ...string) []*actions_model.ActionRunJob {
|
||||
t.Helper()
|
||||
require.NotEmpty(t, callerUses)
|
||||
ctx := t.Context()
|
||||
|
||||
run := &actions_model.ActionRun{
|
||||
Title: "caller-chain-test",
|
||||
RepoID: 4,
|
||||
OwnerID: 1,
|
||||
Index: 9601,
|
||||
WorkflowID: "test.yaml",
|
||||
TriggerUserID: 1,
|
||||
Ref: "refs/heads/master",
|
||||
CommitSHA: "c2d72f548424103f01ee1dc02889c1e2bff816b0",
|
||||
Event: "push",
|
||||
TriggerEvent: "push",
|
||||
EventPayload: "{}",
|
||||
Status: actions_model.StatusRunning,
|
||||
}
|
||||
require.NoError(t, db.Insert(ctx, run))
|
||||
|
||||
attempt := &actions_model.ActionRunAttempt{
|
||||
RepoID: run.RepoID,
|
||||
RunID: run.ID,
|
||||
Attempt: 1,
|
||||
TriggerUserID: 1,
|
||||
Status: actions_model.StatusRunning,
|
||||
}
|
||||
require.NoError(t, db.Insert(ctx, attempt))
|
||||
|
||||
jobs := make([]*actions_model.ActionRunJob, 0, len(callerUses))
|
||||
parentID := int64(0)
|
||||
for i, uses := range callerUses {
|
||||
job := &actions_model.ActionRunJob{
|
||||
RunID: run.ID,
|
||||
RunAttemptID: attempt.ID,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: run.CommitSHA,
|
||||
Name: fmt.Sprintf("caller-%d", i),
|
||||
JobID: fmt.Sprintf("caller-%d", i),
|
||||
Attempt: 1,
|
||||
Status: actions_model.StatusBlocked,
|
||||
AttemptJobID: int64(i + 1),
|
||||
IsReusableCaller: true,
|
||||
CallUses: uses,
|
||||
ParentJobID: parentID,
|
||||
}
|
||||
require.NoError(t, db.Insert(ctx, job))
|
||||
jobs = append(jobs, job)
|
||||
parentID = job.ID
|
||||
}
|
||||
return jobs
|
||||
}
|
||||
+51
-16
@@ -10,6 +10,7 @@ import (
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/models/db"
|
||||
"gitea.dev/modules/actions/jobparser"
|
||||
"gitea.dev/modules/log"
|
||||
"gitea.dev/modules/util"
|
||||
|
||||
act_model "gitea.com/gitea/runner/act/model"
|
||||
@@ -55,6 +56,7 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model
|
||||
// The title will be cut off at 255 characters if it's longer than 255 characters.
|
||||
func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte, vars map[string]string, inputs map[string]any, wfRawConcurrency *act_model.RawConcurrency) error {
|
||||
var cancelledConcurrencyJobs []*actions_model.ActionRunJob
|
||||
var hasWaitingCallerJobs bool
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
|
||||
if err != nil {
|
||||
@@ -128,7 +130,7 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
|
||||
var hasWaitingJobs bool
|
||||
|
||||
for i, v := range jobs {
|
||||
for _, v := range jobs {
|
||||
id, job := v.Job()
|
||||
needs := job.Needs()
|
||||
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
|
||||
@@ -136,30 +138,43 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
}
|
||||
payload, _ := v.Marshal()
|
||||
|
||||
isReusableWorkflowCaller := job.Uses != ""
|
||||
shouldBlockJob := runAttempt.Status == actions_model.StatusBlocked || len(needs) > 0 || run.NeedApproval
|
||||
|
||||
attemptJobID, err := actions_model.GetNextAttemptJobID(ctx, run.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("alloc attempt_job_id: %w", err)
|
||||
}
|
||||
|
||||
job.Name = util.EllipsisDisplayString(job.Name, 255)
|
||||
runJob := &actions_model.ActionRunJob{
|
||||
RunID: run.ID,
|
||||
RunAttemptID: runAttempt.ID,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: run.CommitSHA,
|
||||
IsForkPullRequest: run.IsForkPullRequest,
|
||||
Name: job.Name,
|
||||
Attempt: runAttempt.Attempt,
|
||||
WorkflowPayload: payload,
|
||||
JobID: id,
|
||||
AttemptJobID: int64(i + 1),
|
||||
Needs: needs,
|
||||
RunsOn: job.RunsOn(),
|
||||
Status: util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting),
|
||||
RunID: run.ID,
|
||||
RunAttemptID: runAttempt.ID,
|
||||
RepoID: run.RepoID,
|
||||
OwnerID: run.OwnerID,
|
||||
CommitSHA: run.CommitSHA,
|
||||
IsForkPullRequest: run.IsForkPullRequest,
|
||||
Name: job.Name,
|
||||
Attempt: runAttempt.Attempt,
|
||||
WorkflowPayload: payload,
|
||||
JobID: id,
|
||||
AttemptJobID: attemptJobID,
|
||||
Needs: needs,
|
||||
RunsOn: job.RunsOn(),
|
||||
Status: util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting),
|
||||
WorkflowSourceRepoID: run.RepoID,
|
||||
WorkflowSourceCommitSHA: run.CommitSHA,
|
||||
}
|
||||
// Parse workflow/job permissions (no clamping here)
|
||||
if perms := ExtractJobPermissionsFromWorkflow(v, job); perms != nil {
|
||||
runJob.TokenPermissions = perms
|
||||
}
|
||||
|
||||
if isReusableWorkflowCaller {
|
||||
runJob.IsReusableCaller = true
|
||||
runJob.CallUses = job.Uses
|
||||
}
|
||||
|
||||
// check job concurrency
|
||||
if job.RawConcurrency != nil {
|
||||
rawConcurrency, err := yaml.Marshal(job.RawConcurrency)
|
||||
@@ -188,11 +203,24 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
}
|
||||
}
|
||||
|
||||
hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting
|
||||
// A reusable caller is never dispatched to a runner, so it must not drive the task-version bump.
|
||||
hasWaitingJobs = hasWaitingJobs || (runJob.Status == actions_model.StatusWaiting && !isReusableWorkflowCaller)
|
||||
if err := db.Insert(ctx, runJob); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// expand reusable caller
|
||||
if isReusableWorkflowCaller && runJob.Status == actions_model.StatusWaiting {
|
||||
if err := expandReusableWorkflowCaller(ctx, run, runAttempt, runJob, vars); err != nil {
|
||||
return fmt.Errorf("inline trigger caller %d ready: %w", runJob.ID, err)
|
||||
}
|
||||
// refresh the caller status
|
||||
if err := actions_model.RefreshReusableCallerStatus(ctx, runJob); err != nil {
|
||||
return fmt.Errorf("refresh caller %d status: %w", runJob.ID, err)
|
||||
}
|
||||
hasWaitingCallerJobs = true
|
||||
}
|
||||
|
||||
runJobs = append(runJobs, runJob)
|
||||
}
|
||||
|
||||
@@ -216,5 +244,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, content []byte
|
||||
NotifyWorkflowJobsAndRunsStatusUpdate(ctx, cancelledConcurrencyJobs)
|
||||
EmitJobsIfReadyByJobs(cancelledConcurrencyJobs)
|
||||
|
||||
// Post-commit kick for expanded callers: let job_emitter resolve its child jobs
|
||||
if hasWaitingCallerJobs {
|
||||
if err := EmitJobsIfReadyByRun(run.ID); err != nil {
|
||||
log.Error("emit run %d after InsertRun: %v", run.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user