From 1ac139434e0a57746ecae81fcf79ee0de5166615 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 17 Apr 2019 18:00:34 +0200 Subject: [PATCH] runservice scheduler: cancel unscheduled root tasks when run has result When run has a result set, root tasks not yet scheduled must be cancelled. --- .../runservice/scheduler/scheduler.go | 50 +++++++++++--- .../runservice/scheduler/scheduler_test.go | 68 +++++++++++++++++-- 2 files changed, 101 insertions(+), 17 deletions(-) diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index a8efad9..512d88a 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -97,14 +97,47 @@ func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string) return len(activeTasks) > 0, nil } -func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig) (*types.Run, error) { +func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) (*types.Run, error) { log.Debugf("run: %s", util.Dump(curRun)) log.Debugf("rc: %s", util.Dump(rc)) // take a deepcopy of r so we do logic only on fixed status and not affeccted by current changes (due to random map iteration) newRun := curRun.DeepCopy() - // get tasks that can be executed + // handle root tasks + for _, rt := range newRun.Tasks { + if rt.Skip { + continue + } + if rt.Status != types.RunTaskStatusNotStarted { + continue + } + + rct := rc.Tasks[rt.ID] + parents := runconfig.GetParents(rc.Tasks, rct) + if len(parents) > 0 { + continue + } + + // cancel task if the run has a result set and is not yet scheduled + if curRun.Result.IsSet() { + isScheduled := false + for _, et := range activeExecutorTasks { + if rt.ID == et.ID { + isScheduled = true + } + } + if isScheduled { + continue + } + + if rt.Status == types.RunTaskStatusNotStarted { + rt.Status = types.RunTaskStatusCancelled + } + } + } + + // handle all tasks for _, rt := range newRun.Tasks { if rt.Skip { continue @@ -424,12 +457,12 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run prevPhase := r.Phase prevResult := r.Result - hasActiveTasks, err := s.runHasActiveExecutorTasks(ctx, r.ID) + activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID) if err != nil { return err } - if err := advanceRun(ctx, r, rc, hasActiveTasks); err != nil { + if err := advanceRun(ctx, r, rc, activeExecutorTasks); err != nil { return err } @@ -450,10 +483,6 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run // if the run is set to stop, stop all tasks if r.Stop { - activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID) - if err != nil { - return err - } for _, et := range activeExecutorTasks { et.Stop = true if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { @@ -467,7 +496,7 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run // advance tasks if r.Phase == types.RunPhaseRunning { - r, err := advanceRunTasks(ctx, r, rc) + r, err := advanceRunTasks(ctx, r, rc, activeExecutorTasks) if err != nil { return err } @@ -489,8 +518,9 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run // advanceRun updates the run result and phase. It must be the unique function that // should update them. -func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, hasActiveTasks bool) error { +func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) error { log.Debugf("run: %s", util.Dump(r)) + hasActiveTasks := len(activeExecutorTasks) > 0 // fail run if a task is failed if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { diff --git a/internal/services/runservice/scheduler/scheduler_test.go b/internal/services/runservice/scheduler/scheduler_test.go index edd48b1..cf6e7d0 100644 --- a/internal/services/runservice/scheduler/scheduler_test.go +++ b/internal/services/runservice/scheduler/scheduler_test.go @@ -89,10 +89,14 @@ func TestAdvanceRunTasks(t *testing.T) { }, } - // initial run that matched the runconfig, all tasks are not started or skipped + // initial run that matched the runconfig: + // * the run is in phase running with result unknown + // * all tasks are not started or skipped // (if the runconfig task as Skip == true). This must match the status // generated by command.genRun() run := &types.Run{ + Phase: types.RunPhaseRunning, + Result: types.RunResultUnknown, Tasks: map[string]*types.RunTask{ "task01": &types.RunTask{ ID: "task01", @@ -118,11 +122,12 @@ func TestAdvanceRunTasks(t *testing.T) { } tests := []struct { - name string - rc *types.RunConfig - r *types.Run - out *types.Run - err error + name string + rc *types.RunConfig + r *types.Run + activeExecutorTasks []*types.ExecutorTask + out *types.Run + err error }{ { name: "test top level task not started", @@ -301,12 +306,61 @@ func TestAdvanceRunTasks(t *testing.T) { return run }(), }, + { + name: "cancel all root not started tasks when run has a result set", + rc: func() *types.RunConfig { + rc := rc.DeepCopy() + return rc + }(), + r: func() *types.Run { + run := run.DeepCopy() + run.Result = types.RunResultSuccess + run.Tasks["task04"].Status = types.RunTaskStatusSuccess + return run + }(), + out: func() *types.Run { + run := run.DeepCopy() + run.Result = types.RunResultSuccess + run.Tasks["task01"].Status = types.RunTaskStatusCancelled + run.Tasks["task02"].Status = types.RunTaskStatusNotStarted + run.Tasks["task03"].Status = types.RunTaskStatusCancelled + run.Tasks["task04"].Status = types.RunTaskStatusSuccess + run.Tasks["task05"].Status = types.RunTaskStatusNotStarted + return run + }(), + }, + { + name: "cancel all root not started tasks when run has a result set (task01 is already scheduled)", + rc: func() *types.RunConfig { + rc := rc.DeepCopy() + return rc + }(), + r: func() *types.Run { + run := run.DeepCopy() + run.Result = types.RunResultSuccess + run.Tasks["task04"].Status = types.RunTaskStatusSuccess + return run + }(), + activeExecutorTasks: []*types.ExecutorTask{ + &types.ExecutorTask{ID: "task01"}, + }, + out: func() *types.Run { + run := run.DeepCopy() + run.Result = types.RunResultSuccess + run.Tasks["task01"].Status = types.RunTaskStatusNotStarted + run.Tasks["task02"].Status = types.RunTaskStatusNotStarted + run.Tasks["task03"].Status = types.RunTaskStatusCancelled + run.Tasks["task04"].Status = types.RunTaskStatusSuccess + run.Tasks["task05"].Status = types.RunTaskStatusNotStarted + return run + }(), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - r, err := advanceRunTasks(ctx, tt.r, tt.rc) + r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.activeExecutorTasks) if err != nil { t.Fatalf("unexpected error: %v", err) }