From 9f89a923c0638f819beba5f2fe27a5e968319622 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 17 Apr 2019 18:06:31 +0200 Subject: [PATCH] runservice scheduler: take a copy of run in advanceRunTasks take and change a copy of the current run so we'll change newRun and use curRun status for logic decision. In this way result are reproducible or they will be affected by the random run.Tasks map iteration order. --- .../runservice/scheduler/scheduler.go | 21 ++++++++++++------- .../runservice/scheduler/scheduler_test.go | 5 +++-- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index f230436..a8efad9 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -97,12 +97,15 @@ func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string) return len(activeTasks) > 0, nil } -func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error { - log.Debugf("run: %s", util.Dump(r)) +func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig) (*types.Run, error) { + log.Debugf("run: %s", util.Dump(curRun)) log.Debugf("rc: %s", util.Dump(rc)) + // take a deepcopy of r so we do logic only on fixed status and not affeccted by current changes (due to random map iteration) + newRun := curRun.DeepCopy() + // get tasks that can be executed - for _, rt := range r.Tasks { + for _, rt := range newRun.Tasks { if rt.Skip { continue } @@ -114,7 +117,8 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err parents := runconfig.GetParents(rc.Tasks, rct) finishedParents := 0 for _, p := range parents { - rp := r.Tasks[p.ID] + // use current run status to not be affected by previous changes to to random map iteration + rp := curRun.Tasks[p.ID] if rp.Status.IsFinished() && rp.ArchivesFetchFinished() { finishedParents++ } @@ -127,7 +131,7 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err if allParentsFinished { for _, p := range parents { matched := false - rp := r.Tasks[p.ID] + rp := curRun.Tasks[p.ID] conds := runconfig.GetParentDependConditions(rct, p) for _, cond := range conds { switch cond { @@ -164,7 +168,7 @@ func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) err } } - return nil + return newRun, nil } func getTasksToRun(ctx context.Context, r *types.Run, rc *types.RunConfig) ([]*types.RunTask, error) { @@ -463,10 +467,11 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run // advance tasks if r.Phase == types.RunPhaseRunning { - if err := advanceRunTasks(ctx, r, rc); err != nil { + r, err := advanceRunTasks(ctx, r, rc) + if err != nil { return err } - r, err := store.AtomicPutRun(ctx, s.e, r, nil, nil) + r, err = store.AtomicPutRun(ctx, s.e, r, nil, nil) if err != nil { return err } diff --git a/internal/services/runservice/scheduler/scheduler_test.go b/internal/services/runservice/scheduler/scheduler_test.go index 0ba108d..edd48b1 100644 --- a/internal/services/runservice/scheduler/scheduler_test.go +++ b/internal/services/runservice/scheduler/scheduler_test.go @@ -306,10 +306,11 @@ func TestAdvanceRunTasks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - if err := advanceRunTasks(ctx, tt.r, tt.rc); err != nil { + r, err := advanceRunTasks(ctx, tt.r, tt.rc) + if err != nil { t.Fatalf("unexpected error: %v", err) } - if diff := cmp.Diff(tt.out, tt.r); diff != "" { + if diff := cmp.Diff(tt.out, r); diff != "" { t.Error(diff) } })