From a53e14b4e89fd14372d4eb32af65a900df5f66e6 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 12 Jun 2019 18:12:37 +0200 Subject: [PATCH] runservice: check if executor is alive before scheduling tasks Check that the last update time is less than 1 minute (currently hardcoded) --- internal/services/runservice/api/executor.go | 4 + internal/services/runservice/scheduler.go | 16 ++- .../services/runservice/scheduler_test.go | 105 ++++++++++++++++++ internal/services/runservice/types/types.go | 18 +++ 4 files changed, 141 insertions(+), 2 deletions(-) diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index 7ca04d6..ab402b7 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -21,6 +21,7 @@ import ( "io" "net/http" "strconv" + "time" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" @@ -57,6 +58,9 @@ func (h *ExecutorStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request return } + // set last status update time + executor.LastStatusUpdateTime = time.Now() + if _, err := store.PutExecutor(ctx, h.e, executor); err != nil { http.Error(w, "", http.StatusInternalServerError) return diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index a1cb895..29a2dee 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -43,6 +43,8 @@ import ( const ( cacheCleanerInterval = 1 * 24 * time.Hour + + defaultExecutorNotAliveInterval = 60 * time.Second ) var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) @@ -273,7 +275,15 @@ func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTas if err != nil { return nil, err } + return chooseExecutor(executors, rct), nil +} + +func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *types.Executor { for _, e := range executors { + if e.LastStatusUpdateTime.Add(defaultExecutorNotAliveInterval).Before(time.Now()) { + continue + } + // if arch is not defined use any executor arch if rct.Runtime.Arch != "" { hasArch := false @@ -286,15 +296,17 @@ func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTas continue } } + if e.ActiveTasksLimit != 0 { if e.ActiveTasks >= e.ActiveTasksLimit { continue } } - return e, nil + return e } - return nil, nil + + return nil } type parentsByLevelName []*types.RunConfigTask diff --git a/internal/services/runservice/scheduler_test.go b/internal/services/runservice/scheduler_test.go index 5be49ff..78b63ac 100644 --- a/internal/services/runservice/scheduler_test.go +++ b/internal/services/runservice/scheduler_test.go @@ -18,8 +18,10 @@ import ( "context" "sort" "testing" + "time" "github.com/google/go-cmp/cmp" + "github.com/sorintlab/agola/internal/common" "github.com/sorintlab/agola/internal/services/runservice/types" ) @@ -539,3 +541,106 @@ func TestGetTasksToRun(t *testing.T) { }) } } + +func TestChooseExecutor(t *testing.T) { + executorOK := &types.Executor{ + ID: "executorOK", + Archs: []common.Arch{common.ArchAMD64}, + ActiveTasksLimit: 2, + ActiveTasks: 0, + LastStatusUpdateTime: time.Now(), + } + + executorNoFreeTaskSlots := func() *types.Executor { + e := executorOK.DeepCopy() + e.ID = "executorNoFreeTasksSlots" + e.ActiveTasks = 2 + return e + }() + + executorNotAlive := func() *types.Executor { + e := executorOK.DeepCopy() + e.ID = "executorNotAlive" + e.LastStatusUpdateTime = time.Now().Add(-120 * time.Second) + return e + }() + + executorOKMultipleArchs := func() *types.Executor { + e := executorOK.DeepCopy() + e.ID = "executorOKMultipleArchs" + e.Archs = []common.Arch{common.ArchAMD64, common.ArchARM64} + return e + }() + + // Only primary and the required variables for this test are set + rct := &types.RunConfigTask{ + ID: "task01", + Name: "task01", + Runtime: &types.Runtime{Type: types.RuntimeType("pod"), + Arch: common.ArchAMD64, + }, + } + + tests := []struct { + name string + executors []*types.Executor + rct *types.RunConfigTask + out *types.Executor + err error + }{ + { + name: "test single executor ok", + executors: []*types.Executor{executorOK}, + // Only primary and the required variables for this test are set + rct: rct, + out: executorOK, + }, + { + name: "test single executor without free task slots", + executors: []*types.Executor{executorNoFreeTaskSlots}, + // Only primary and the required variables for this test are set + rct: rct, + out: nil, + }, + { + name: "test single executor not alive", + executors: []*types.Executor{executorNotAlive}, + rct: rct, + out: nil, + }, + { + name: "test single executor with different arch", + executors: func() []*types.Executor { + e := executorOK.DeepCopy() + e.Archs = []common.Arch{common.ArchARM64} + return []*types.Executor{e} + }(), + rct: rct, + out: nil, + }, + { + name: "test single executor with multiple archs and one matches the task required arch", + executors: []*types.Executor{executorOKMultipleArchs}, + rct: rct, + out: executorOKMultipleArchs, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := chooseExecutor(tt.executors, tt.rct) + if e == nil && tt.out == nil { + return + } + if e == nil && tt.out != nil { + t.Fatalf("expected executor with id: %s, go no executor selected", tt.out.ID) + } + if e != nil && tt.out == nil { + t.Fatalf("expected no executor selected, got executor with id: %s", e.ID) + } + if e != tt.out { + t.Fatalf("wrong executor ID, expected %s, got: %s", tt.out.ID, e.ID) + } + }) + } +} diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index 7298ab2..de8f7bf 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -333,6 +333,14 @@ type RunConfigTask struct { DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"` } +func (rct *RunConfigTask) DeepCopy() *RunConfigTask { + nrct, err := copystructure.Copy(rct) + if err != nil { + panic(err) + } + return nrct.(*RunConfigTask) +} + type RunConfigTaskDependCondition string const ( @@ -611,10 +619,20 @@ type Executor struct { // SiblingExecutors are all the executors in the ExecutorGroup SiblingsExecutors []string `json:"siblings_executors,omitempty"` + LastStatusUpdateTime time.Time `json:"last_status_update_time,omitempty"` + // internal values not saved Revision int64 `json:"-"` } +func (e *Executor) DeepCopy() *Executor { + ne, err := copystructure.Copy(e) + if err != nil { + panic(err) + } + return ne.(*Executor) +} + type RunEvent struct { Sequence string RunID string