runservice scheduler: automatically remove dynamic executors

This commit is contained in:
Simone Gotti 2019-04-24 13:25:41 +02:00
parent 7c9be9b57d
commit 07bc4a21ff
2 changed files with 46 additions and 5 deletions

View File

@ -16,6 +16,7 @@ package api
import (
"bufio"
"context"
"encoding/json"
"io"
"net/http"
@ -33,12 +34,13 @@ import (
)
type ExecutorStatusHandler struct {
log *zap.SugaredLogger
e *etcd.Store
c chan<- *types.ExecutorTask
ch *command.CommandHandler
}
func NewExecutorStatusHandler(e *etcd.Store, c chan<- *types.ExecutorTask) *ExecutorStatusHandler {
return &ExecutorStatusHandler{e: e, c: c}
func NewExecutorStatusHandler(logger *zap.Logger, e *etcd.Store, ch *command.CommandHandler) *ExecutorStatusHandler {
return &ExecutorStatusHandler{log: logger.Sugar(), e: e, ch: ch}
}
func (h *ExecutorStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -58,6 +60,45 @@ func (h *ExecutorStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
http.Error(w, "", http.StatusInternalServerError)
return
}
if err := h.deleteStaleExecutors(ctx, executor); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func (h *ExecutorStatusHandler) deleteStaleExecutors(ctx context.Context, curExecutor *types.Executor) error {
executors, err := store.GetExecutors(ctx, h.e)
if err != nil {
return err
}
for _, executor := range executors {
if executor.ID == curExecutor.ID {
continue
}
if !executor.Dynamic {
continue
}
if executor.ExecutorGroup != curExecutor.ExecutorGroup {
continue
}
// executor is dynamic and in the same executor group
active := false
for _, seID := range curExecutor.SiblingsExecutors {
if executor.ID == seID {
active = true
break
}
}
if !active {
if err := h.ch.DeleteExecutor(ctx, executor.ID); err != nil {
h.log.Errorf("failed to delete executor %q: %v", executor.ID, err)
}
}
}
return nil
}
type ExecutorTaskStatusHandler struct {

View File

@ -1625,7 +1625,7 @@ func (s *Scheduler) Run(ctx context.Context) error {
corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions)
// executor dedicated api, only calls from executor should happen on these handlers
executorStatusHandler := api.NewExecutorStatusHandler(s.e, ch)
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ch)
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch)
executorTaskHandler := api.NewExecutorTaskHandler(s.e)
executorTasksHandler := api.NewExecutorTasksHandler(s.e)