runservice scheduler: take a copy of run in advanceRunTasks

take and change a copy of the current run so we'll change newRun and use curRun
status for logic decision. In this way result are reproducible or they will be
affected by the random run.Tasks map iteration order.
This commit is contained in:
Simone Gotti 2019-04-17 18:06:31 +02:00
parent 4dd89646af
commit 9f89a923c0
2 changed files with 16 additions and 10 deletions

View File

@ -97,12 +97,15 @@ func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string)
return len(activeTasks) > 0, nil
}
func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error {
log.Debugf("run: %s", util.Dump(r))
func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig) (*types.Run, error) {
log.Debugf("run: %s", util.Dump(curRun))
log.Debugf("rc: %s", util.Dump(rc))
// take a deepcopy of r so we do logic only on fixed status and not affeccted by current changes (due to random map iteration)
newRun := curRun.DeepCopy()
// get tasks that can be executed
for _, rt := range r.Tasks {
for _, rt := range newRun.Tasks {
if rt.Skip {
continue
}
@ -114,7 +117,8 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err
parents := runconfig.GetParents(rc.Tasks, rct)
finishedParents := 0
for _, p := range parents {
rp := r.Tasks[p.ID]
// use current run status to not be affected by previous changes to to random map iteration
rp := curRun.Tasks[p.ID]
if rp.Status.IsFinished() && rp.ArchivesFetchFinished() {
finishedParents++
}
@ -127,7 +131,7 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err
if allParentsFinished {
for _, p := range parents {
matched := false
rp := r.Tasks[p.ID]
rp := curRun.Tasks[p.ID]
conds := runconfig.GetParentDependConditions(rct, p)
for _, cond := range conds {
switch cond {
@ -164,7 +168,7 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err
}
}
return nil
return newRun, nil
}
func getTasksToRun(ctx context.Context, r *types.Run, rc *types.RunConfig) ([]*types.RunTask, error) {
@ -463,10 +467,11 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run
// advance tasks
if r.Phase == types.RunPhaseRunning {
if err := advanceRunTasks(ctx, r, rc); err != nil {
r, err := advanceRunTasks(ctx, r, rc)
if err != nil {
return err
}
r, err := store.AtomicPutRun(ctx, s.e, r, nil, nil)
r, err = store.AtomicPutRun(ctx, s.e, r, nil, nil)
if err != nil {
return err
}

View File

@ -306,10 +306,11 @@ func TestAdvanceRunTasks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
if err := advanceRunTasks(ctx, tt.r, tt.rc); err != nil {
r, err := advanceRunTasks(ctx, tt.r, tt.rc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if diff := cmp.Diff(tt.out, tt.r); diff != "" {
if diff := cmp.Diff(tt.out, r); diff != "" {
t.Error(diff)
}
})