diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 7161407..9b7f356 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -44,25 +44,6 @@ const ( defaultExecutorNotAliveInterval = 60 * time.Second ) -func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { - // the real source of active tasks is the number of executor tasks in etcd - // we can't rely on RunTask.Status since it's only updated when receiveing - // updated from the executor so it could be in a NotStarted state but have an - // executor tasks scheduled and running - ets, err := store.GetExecutorTasksForRun(ctx, s.e, runID) - if err != nil { - return nil, err - } - activeTasks := []*types.ExecutorTask{} - for _, et := range ets { - if !et.Status.Phase.IsFinished() { - activeTasks = append(activeTasks, et) - } - } - - return activeTasks, nil -} - func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r *types.Run, rc *types.RunConfig) bool { rct := rc.Tasks[rt.ID] parents := runconfig.GetParents(rc.Tasks, rct) @@ -96,7 +77,7 @@ func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r return len(parents) == matchedNum } -func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) (*types.Run, error) { +func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) (*types.Run, error) { log.Debugf("run: %s", util.Dump(curRun)) log.Debugf("rc: %s", util.Dump(rc)) @@ -107,7 +88,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig // if the run is set to stop, skip all not running tasks for _, rt := range newRun.Tasks { isScheduled := false - for _, et := range activeExecutorTasks { + for _, et := range scheduledExecutorTasks { if rt.ID == et.ID { isScheduled = true } @@ -139,7 +120,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig // cancel task if the run has a result set and is not yet scheduled if curRun.Result.IsSet() { isScheduled := false - for _, et := range activeExecutorTasks { + for _, et := range scheduledExecutorTasks { if rt.ID == et.ID { isScheduled = true } @@ -480,12 +461,16 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru prevPhase := r.Phase prevResult := r.Result - activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID) + // the real source of active tasks is the number of executor tasks in etcd + // we can't rely on RunTask.Status since it's only updated when receiveing + // updated from the executor so it could be in a NotStarted state but have an + // executor tasks scheduled and running + scheduledExecutorTasks, err := store.GetExecutorTasksForRun(ctx, s.e, r.ID) if err != nil { return err } - if err := advanceRun(ctx, r, rc, activeExecutorTasks); err != nil { + if err := advanceRun(ctx, r, rc, scheduledExecutorTasks); err != nil { return err } @@ -506,7 +491,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru // if the run is set to stop, stop all active tasks if r.Stop { - for _, et := range activeExecutorTasks { + for _, et := range scheduledExecutorTasks { et.Spec.Stop = true if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { return err @@ -519,7 +504,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru // advance tasks if r.Phase == types.RunPhaseRunning { - r, err := advanceRunTasks(ctx, r, rc, activeExecutorTasks) + r, err := advanceRunTasks(ctx, r, rc, scheduledExecutorTasks) if err != nil { return err } @@ -541,9 +526,9 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru // advanceRun updates the run result and phase. It must be the unique function that // should update them. -func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) error { +func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) error { log.Debugf("run: %s", util.Dump(r)) - hasActiveTasks := len(activeExecutorTasks) > 0 + hasScheduledTasks := len(scheduledExecutorTasks) > 0 // fail run if a task is failed if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { @@ -595,7 +580,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx } if finished && !r.Phase.IsFinished() { - if !hasActiveTasks { + if !hasScheduledTasks { r.ChangePhase(types.RunPhaseFinished) } } diff --git a/internal/services/runservice/scheduler_test.go b/internal/services/runservice/scheduler_test.go index 35a0c12..cf15b3b 100644 --- a/internal/services/runservice/scheduler_test.go +++ b/internal/services/runservice/scheduler_test.go @@ -125,11 +125,11 @@ func TestAdvanceRunTasks(t *testing.T) { } tests := []struct { - name string - rc *types.RunConfig - r *types.Run - activeExecutorTasks []*types.ExecutorTask - out *types.Run + name string + rc *types.RunConfig + r *types.Run + scheduledExecutorTasks []*types.ExecutorTask + out *types.Run }{ { name: "test top level task not started", @@ -343,7 +343,7 @@ func TestAdvanceRunTasks(t *testing.T) { run.Tasks["task04"].Status = types.RunTaskStatusSuccess return run }(), - activeExecutorTasks: []*types.ExecutorTask{ + scheduledExecutorTasks: []*types.ExecutorTask{ &types.ExecutorTask{ID: "task01"}, }, out: func() *types.Run { @@ -371,7 +371,7 @@ func TestAdvanceRunTasks(t *testing.T) { run.Stop = true return run }(), - activeExecutorTasks: []*types.ExecutorTask{ + scheduledExecutorTasks: []*types.ExecutorTask{ &types.ExecutorTask{ID: "task01"}, }, out: func() *types.Run { @@ -390,7 +390,7 @@ func TestAdvanceRunTasks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.activeExecutorTasks) + r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.scheduledExecutorTasks) if err != nil { t.Fatalf("unexpected error: %v", err) }