Merge pull request #220 from sgotti/runservice_use_all_scheduled_tasks_in_scheduleRun

runservice: use all scheduled tasks in scheduleRun
This commit is contained in:
Simone Gotti 2020-02-28 15:03:39 +01:00 committed by GitHub
commit cf8884d43a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 37 deletions

View File

@ -44,25 +44,6 @@ const (
defaultExecutorNotAliveInterval = 60 * time.Second defaultExecutorNotAliveInterval = 60 * time.Second
) )
func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) {
// the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing
// updated from the executor so it could be in a NotStarted state but have an
// executor tasks scheduled and running
ets, err := store.GetExecutorTasksForRun(ctx, s.e, runID)
if err != nil {
return nil, err
}
activeTasks := []*types.ExecutorTask{}
for _, et := range ets {
if !et.Status.Phase.IsFinished() {
activeTasks = append(activeTasks, et)
}
}
return activeTasks, nil
}
func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r *types.Run, rc *types.RunConfig) bool { func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r *types.Run, rc *types.RunConfig) bool {
rct := rc.Tasks[rt.ID] rct := rc.Tasks[rt.ID]
parents := runconfig.GetParents(rc.Tasks, rct) parents := runconfig.GetParents(rc.Tasks, rct)
@ -96,7 +77,7 @@ func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r
return len(parents) == matchedNum return len(parents) == matchedNum
} }
func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) (*types.Run, error) { func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) (*types.Run, error) {
log.Debugf("run: %s", util.Dump(curRun)) log.Debugf("run: %s", util.Dump(curRun))
log.Debugf("rc: %s", util.Dump(rc)) log.Debugf("rc: %s", util.Dump(rc))
@ -107,7 +88,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig
// if the run is set to stop, skip all not running tasks // if the run is set to stop, skip all not running tasks
for _, rt := range newRun.Tasks { for _, rt := range newRun.Tasks {
isScheduled := false isScheduled := false
for _, et := range activeExecutorTasks { for _, et := range scheduledExecutorTasks {
if rt.ID == et.ID { if rt.ID == et.ID {
isScheduled = true isScheduled = true
} }
@ -139,7 +120,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig
// cancel task if the run has a result set and is not yet scheduled // cancel task if the run has a result set and is not yet scheduled
if curRun.Result.IsSet() { if curRun.Result.IsSet() {
isScheduled := false isScheduled := false
for _, et := range activeExecutorTasks { for _, et := range scheduledExecutorTasks {
if rt.ID == et.ID { if rt.ID == et.ID {
isScheduled = true isScheduled = true
} }
@ -480,12 +461,16 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
prevPhase := r.Phase prevPhase := r.Phase
prevResult := r.Result prevResult := r.Result
activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID) // the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing
// updated from the executor so it could be in a NotStarted state but have an
// executor tasks scheduled and running
scheduledExecutorTasks, err := store.GetExecutorTasksForRun(ctx, s.e, r.ID)
if err != nil { if err != nil {
return err return err
} }
if err := advanceRun(ctx, r, rc, activeExecutorTasks); err != nil { if err := advanceRun(ctx, r, rc, scheduledExecutorTasks); err != nil {
return err return err
} }
@ -506,7 +491,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
// if the run is set to stop, stop all active tasks // if the run is set to stop, stop all active tasks
if r.Stop { if r.Stop {
for _, et := range activeExecutorTasks { for _, et := range scheduledExecutorTasks {
et.Spec.Stop = true et.Spec.Stop = true
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err return err
@ -519,7 +504,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
// advance tasks // advance tasks
if r.Phase == types.RunPhaseRunning { if r.Phase == types.RunPhaseRunning {
r, err := advanceRunTasks(ctx, r, rc, activeExecutorTasks) r, err := advanceRunTasks(ctx, r, rc, scheduledExecutorTasks)
if err != nil { if err != nil {
return err return err
} }
@ -541,9 +526,9 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
// advanceRun updates the run result and phase. It must be the unique function that // advanceRun updates the run result and phase. It must be the unique function that
// should update them. // should update them.
func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) error { func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) error {
log.Debugf("run: %s", util.Dump(r)) log.Debugf("run: %s", util.Dump(r))
hasActiveTasks := len(activeExecutorTasks) > 0 hasScheduledTasks := len(scheduledExecutorTasks) > 0
// fail run if a task is failed // fail run if a task is failed
if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning {
@ -595,7 +580,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx
} }
if finished && !r.Phase.IsFinished() { if finished && !r.Phase.IsFinished() {
if !hasActiveTasks { if !hasScheduledTasks {
r.ChangePhase(types.RunPhaseFinished) r.ChangePhase(types.RunPhaseFinished)
} }
} }

View File

@ -128,7 +128,7 @@ func TestAdvanceRunTasks(t *testing.T) {
name string name string
rc *types.RunConfig rc *types.RunConfig
r *types.Run r *types.Run
activeExecutorTasks []*types.ExecutorTask scheduledExecutorTasks []*types.ExecutorTask
out *types.Run out *types.Run
}{ }{
{ {
@ -343,7 +343,7 @@ func TestAdvanceRunTasks(t *testing.T) {
run.Tasks["task04"].Status = types.RunTaskStatusSuccess run.Tasks["task04"].Status = types.RunTaskStatusSuccess
return run return run
}(), }(),
activeExecutorTasks: []*types.ExecutorTask{ scheduledExecutorTasks: []*types.ExecutorTask{
&types.ExecutorTask{ID: "task01"}, &types.ExecutorTask{ID: "task01"},
}, },
out: func() *types.Run { out: func() *types.Run {
@ -371,7 +371,7 @@ func TestAdvanceRunTasks(t *testing.T) {
run.Stop = true run.Stop = true
return run return run
}(), }(),
activeExecutorTasks: []*types.ExecutorTask{ scheduledExecutorTasks: []*types.ExecutorTask{
&types.ExecutorTask{ID: "task01"}, &types.ExecutorTask{ID: "task01"},
}, },
out: func() *types.Run { out: func() *types.Run {
@ -390,7 +390,7 @@ func TestAdvanceRunTasks(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.activeExecutorTasks) r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.scheduledExecutorTasks)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }