runservice: use all scheduled tasks in scheduleRun

rename activeExecutorTasks to scheduledExecutorTasks and don't filter out
finished tasks.
In some logic we need all the scheduled tasks and not only the not finished
ones.
This commit is contained in:
Simone Gotti 2020-02-27 14:59:04 +01:00
parent a4e280cd7d
commit 3ac018e6e5
2 changed files with 22 additions and 37 deletions

View File

@ -44,25 +44,6 @@ const (
defaultExecutorNotAliveInterval = 60 * time.Second defaultExecutorNotAliveInterval = 60 * time.Second
) )
func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) {
// the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing
// updated from the executor so it could be in a NotStarted state but have an
// executor tasks scheduled and running
ets, err := store.GetExecutorTasksForRun(ctx, s.e, runID)
if err != nil {
return nil, err
}
activeTasks := []*types.ExecutorTask{}
for _, et := range ets {
if !et.Status.Phase.IsFinished() {
activeTasks = append(activeTasks, et)
}
}
return activeTasks, nil
}
func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r *types.Run, rc *types.RunConfig) bool { func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r *types.Run, rc *types.RunConfig) bool {
rct := rc.Tasks[rt.ID] rct := rc.Tasks[rt.ID]
parents := runconfig.GetParents(rc.Tasks, rct) parents := runconfig.GetParents(rc.Tasks, rct)
@ -96,7 +77,7 @@ func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r
return len(parents) == matchedNum return len(parents) == matchedNum
} }
func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) (*types.Run, error) { func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) (*types.Run, error) {
log.Debugf("run: %s", util.Dump(curRun)) log.Debugf("run: %s", util.Dump(curRun))
log.Debugf("rc: %s", util.Dump(rc)) log.Debugf("rc: %s", util.Dump(rc))
@ -107,7 +88,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig
// if the run is set to stop, skip all not running tasks // if the run is set to stop, skip all not running tasks
for _, rt := range newRun.Tasks { for _, rt := range newRun.Tasks {
isScheduled := false isScheduled := false
for _, et := range activeExecutorTasks { for _, et := range scheduledExecutorTasks {
if rt.ID == et.ID { if rt.ID == et.ID {
isScheduled = true isScheduled = true
} }
@ -139,7 +120,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig
// cancel task if the run has a result set and is not yet scheduled // cancel task if the run has a result set and is not yet scheduled
if curRun.Result.IsSet() { if curRun.Result.IsSet() {
isScheduled := false isScheduled := false
for _, et := range activeExecutorTasks { for _, et := range scheduledExecutorTasks {
if rt.ID == et.ID { if rt.ID == et.ID {
isScheduled = true isScheduled = true
} }
@ -480,12 +461,16 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
prevPhase := r.Phase prevPhase := r.Phase
prevResult := r.Result prevResult := r.Result
activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID) // the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing
// updated from the executor so it could be in a NotStarted state but have an
// executor tasks scheduled and running
scheduledExecutorTasks, err := store.GetExecutorTasksForRun(ctx, s.e, r.ID)
if err != nil { if err != nil {
return err return err
} }
if err := advanceRun(ctx, r, rc, activeExecutorTasks); err != nil { if err := advanceRun(ctx, r, rc, scheduledExecutorTasks); err != nil {
return err return err
} }
@ -506,7 +491,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
// if the run is set to stop, stop all active tasks // if the run is set to stop, stop all active tasks
if r.Stop { if r.Stop {
for _, et := range activeExecutorTasks { for _, et := range scheduledExecutorTasks {
et.Spec.Stop = true et.Spec.Stop = true
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err return err
@ -519,7 +504,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
// advance tasks // advance tasks
if r.Phase == types.RunPhaseRunning { if r.Phase == types.RunPhaseRunning {
r, err := advanceRunTasks(ctx, r, rc, activeExecutorTasks) r, err := advanceRunTasks(ctx, r, rc, scheduledExecutorTasks)
if err != nil { if err != nil {
return err return err
} }
@ -541,9 +526,9 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
// advanceRun updates the run result and phase. It must be the unique function that // advanceRun updates the run result and phase. It must be the unique function that
// should update them. // should update them.
func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) error { func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) error {
log.Debugf("run: %s", util.Dump(r)) log.Debugf("run: %s", util.Dump(r))
hasActiveTasks := len(activeExecutorTasks) > 0 hasScheduledTasks := len(scheduledExecutorTasks) > 0
// fail run if a task is failed // fail run if a task is failed
if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning {
@ -595,7 +580,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx
} }
if finished && !r.Phase.IsFinished() { if finished && !r.Phase.IsFinished() {
if !hasActiveTasks { if !hasScheduledTasks {
r.ChangePhase(types.RunPhaseFinished) r.ChangePhase(types.RunPhaseFinished)
} }
} }

View File

@ -128,7 +128,7 @@ func TestAdvanceRunTasks(t *testing.T) {
name string name string
rc *types.RunConfig rc *types.RunConfig
r *types.Run r *types.Run
activeExecutorTasks []*types.ExecutorTask scheduledExecutorTasks []*types.ExecutorTask
out *types.Run out *types.Run
}{ }{
{ {
@ -343,7 +343,7 @@ func TestAdvanceRunTasks(t *testing.T) {
run.Tasks["task04"].Status = types.RunTaskStatusSuccess run.Tasks["task04"].Status = types.RunTaskStatusSuccess
return run return run
}(), }(),
activeExecutorTasks: []*types.ExecutorTask{ scheduledExecutorTasks: []*types.ExecutorTask{
&types.ExecutorTask{ID: "task01"}, &types.ExecutorTask{ID: "task01"},
}, },
out: func() *types.Run { out: func() *types.Run {
@ -371,7 +371,7 @@ func TestAdvanceRunTasks(t *testing.T) {
run.Stop = true run.Stop = true
return run return run
}(), }(),
activeExecutorTasks: []*types.ExecutorTask{ scheduledExecutorTasks: []*types.ExecutorTask{
&types.ExecutorTask{ID: "task01"}, &types.ExecutorTask{ID: "task01"},
}, },
out: func() *types.Run { out: func() *types.Run {
@ -390,7 +390,7 @@ func TestAdvanceRunTasks(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.activeExecutorTasks) r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.scheduledExecutorTasks)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }