runservice: mark not running tasks as skipped when run marked to stop

Currently when a run is marked to stop we are going to stop currently running
tasks and then their childs will be marked as skipped.

But tasks not depending on a stopped task (root task or childs with a finished
parent) that are just waiting for an executor slot, will be scheduled when
there will be a free slot also if the run is marked to stop (and then the
scheduler will stop them after some seconds).

This patch will mark all not started tasks as skipped when the run is marked to
stop.
This commit is contained in:
Simone Gotti 2020-02-26 13:38:27 +01:00
parent 3e611f6cd1
commit 5dd9e587fe
2 changed files with 50 additions and 3 deletions

View File

@ -103,6 +103,24 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig
// take a deepcopy of r so we do logic only on fixed status and not affeccted by current changes (due to random map iteration) // take a deepcopy of r so we do logic only on fixed status and not affeccted by current changes (due to random map iteration)
newRun := curRun.DeepCopy() newRun := curRun.DeepCopy()
if newRun.Stop {
// if the run is set to stop, skip all not running tasks
for _, rt := range newRun.Tasks {
isScheduled := false
for _, et := range activeExecutorTasks {
if rt.ID == et.ID {
isScheduled = true
}
}
if isScheduled {
continue
}
if rt.Status == types.RunTaskStatusNotStarted {
rt.Status = types.RunTaskStatusSkipped
}
}
}
// handle root tasks // handle root tasks
for _, rt := range newRun.Tasks { for _, rt := range newRun.Tasks {
if rt.Skip { if rt.Skip {
@ -473,7 +491,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
return err return err
} }
// if the run is set to stop, stop all tasks // if the run is set to stop, stop all active tasks
if r.Stop { if r.Stop {
for _, et := range activeExecutorTasks { for _, et := range activeExecutorTasks {
et.Spec.Stop = true et.Spec.Stop = true

View File

@ -65,8 +65,9 @@ func TestAdvanceRunTasks(t *testing.T) {
Skip: false, Skip: false,
}, },
"task04": &types.RunConfigTask{ "task04": &types.RunConfigTask{
ID: "task04", ID: "task04",
Name: "task04", Name: "task04",
Depends: map[string]*types.RunConfigTaskDepend{},
Runtime: &types.Runtime{Type: types.RuntimeType("pod"), Runtime: &types.Runtime{Type: types.RuntimeType("pod"),
Containers: []*types.Container{{Image: "image01"}}, Containers: []*types.Container{{Image: "image01"}},
}, },
@ -356,6 +357,34 @@ func TestAdvanceRunTasks(t *testing.T) {
return run return run
}(), }(),
}, },
{
name: "skip all not started tasks when run is set to stop",
rc: func() *types.RunConfig {
rc := rc.DeepCopy()
return rc
}(),
r: func() *types.Run {
run := run.DeepCopy()
run.Tasks["task01"].Status = types.RunTaskStatusRunning
run.Tasks["task04"].Status = types.RunTaskStatusSuccess
run.Tasks["task03"].Status = types.RunTaskStatusCancelled
run.Stop = true
return run
}(),
activeExecutorTasks: []*types.ExecutorTask{
&types.ExecutorTask{ID: "task01"},
},
out: func() *types.Run {
run := run.DeepCopy()
run.Stop = true
run.Tasks["task01"].Status = types.RunTaskStatusRunning
run.Tasks["task02"].Status = types.RunTaskStatusSkipped
run.Tasks["task03"].Status = types.RunTaskStatusCancelled
run.Tasks["task04"].Status = types.RunTaskStatusSuccess
run.Tasks["task05"].Status = types.RunTaskStatusSkipped
return run
}(),
},
} }
for _, tt := range tests { for _, tt := range tests {