From 07bc4a21ffa5f302fa195b6a58174f6e440b974a Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 24 Apr 2019 13:25:41 +0200 Subject: [PATCH] runservice scheduler: automatically remove dynamic executors --- .../runservice/scheduler/api/executor.go | 49 +++++++++++++++++-- .../runservice/scheduler/scheduler.go | 2 +- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/scheduler/api/executor.go index 3e093a4..676c576 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/scheduler/api/executor.go @@ -16,6 +16,7 @@ package api import ( "bufio" + "context" "encoding/json" "io" "net/http" @@ -33,12 +34,13 @@ import ( ) type ExecutorStatusHandler struct { - e *etcd.Store - c chan<- *types.ExecutorTask + log *zap.SugaredLogger + e *etcd.Store + ch *command.CommandHandler } -func NewExecutorStatusHandler(e *etcd.Store, c chan<- *types.ExecutorTask) *ExecutorStatusHandler { - return &ExecutorStatusHandler{e: e, c: c} +func NewExecutorStatusHandler(logger *zap.Logger, e *etcd.Store, ch *command.CommandHandler) *ExecutorStatusHandler { + return &ExecutorStatusHandler{log: logger.Sugar(), e: e, ch: ch} } func (h *ExecutorStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -58,6 +60,45 @@ func (h *ExecutorStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request http.Error(w, "", http.StatusInternalServerError) return } + + if err := h.deleteStaleExecutors(ctx, executor); err != nil { + http.Error(w, "", http.StatusInternalServerError) + return + } +} + +func (h *ExecutorStatusHandler) deleteStaleExecutors(ctx context.Context, curExecutor *types.Executor) error { + executors, err := store.GetExecutors(ctx, h.e) + if err != nil { + return err + } + + for _, executor := range executors { + if executor.ID == curExecutor.ID { + continue + } + if !executor.Dynamic { + continue + } + if executor.ExecutorGroup != curExecutor.ExecutorGroup { + continue + } + // executor is dynamic and in the same executor group + active := false + for _, seID := range curExecutor.SiblingsExecutors { + if executor.ID == seID { + active = true + break + } + } + if !active { + if err := h.ch.DeleteExecutor(ctx, executor.ID); err != nil { + h.log.Errorf("failed to delete executor %q: %v", executor.ID, err) + } + } + } + + return nil } type ExecutorTaskStatusHandler struct { diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index b60ea1d..a6d23fe 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -1625,7 +1625,7 @@ func (s *Scheduler) Run(ctx context.Context) error { corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions) // executor dedicated api, only calls from executor should happen on these handlers - executorStatusHandler := api.NewExecutorStatusHandler(s.e, ch) + executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ch) executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch) executorTaskHandler := api.NewExecutorTaskHandler(s.e) executorTasksHandler := api.NewExecutorTasksHandler(s.e)