From 145c87b4c0552a311a719ecbc8081d60d61f9523 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 26 Feb 2020 16:25:33 +0100 Subject: [PATCH] runservice: minimize scheduling of tasks that will be queued by the executor Since the executor only periodically updates its state we could end up scheduling much more tasks than the executor ActiveTasksLimit. This will happen in the case of many parallel tasks that can all start at the same time. To avoid this also considere the executor tasks saved in etcd that represent the real view of scheduled tasks. --- internal/services/runservice/action/action.go | 2 +- internal/services/runservice/scheduler.go | 23 ++++++++++++++--- .../services/runservice/scheduler_test.go | 2 +- internal/services/runservice/store/store.go | 25 ++++++++++++++++++- 4 files changed, 45 insertions(+), 7 deletions(-) diff --git a/internal/services/runservice/action/action.go b/internal/services/runservice/action/action.go index 2010fcd..cf13cf3 100644 --- a/internal/services/runservice/action/action.go +++ b/internal/services/runservice/action/action.go @@ -615,7 +615,7 @@ func (h *ActionHandler) GetExecutorTask(ctx context.Context, etID string) (*type } func (h *ActionHandler) GetExecutorTasks(ctx context.Context, executorID string) ([]*types.ExecutorTask, error) { - ets, err := store.GetExecutorTasks(ctx, h.e, executorID) + ets, err := store.GetExecutorTasksForExecutor(ctx, h.e, executorID) if err != nil && err != etcd.ErrKeyNotFound { return nil, err } diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index c1c28dd..7161407 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -290,10 +290,16 @@ func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTas if err != nil { return nil, err } - return chooseExecutor(executors, rct), nil + // TODO(sgotti) find a way to avoid retrieving this for every chooseExecutor + // invocation (i.e. use an etcd watcher to keep this value updated) + executorTasksCount, err := store.GetExecutorTasksCountByExecutor(ctx, s.e) + if err != nil { + return nil, err + } + return chooseExecutor(executors, executorTasksCount, rct), nil } -func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *types.Executor { +func chooseExecutor(executors []*types.Executor, executorTasksCount map[string]int, rct *types.RunConfigTask) *types.Executor { requiresPrivilegedContainers := false for _, c := range rct.Runtime.Containers { if c.Privileged { @@ -326,7 +332,14 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type } if e.ActiveTasksLimit != 0 { - if e.ActiveTasks >= e.ActiveTasksLimit { + // will be 0 when executorTasksCount[e.ID] doesn't exist + activeTasks := executorTasksCount[e.ID] + if e.ActiveTasks > activeTasks { + activeTasks = e.ActiveTasks + } + // calculate the active tasks by the max between the current scheduled + // tasks in the store and the executor reported tasks + if activeTasks >= e.ActiveTasksLimit { continue } } @@ -674,7 +687,7 @@ func (s *Runservice) updateRunTaskStatus(ctx context.Context, et *types.Executor } } if wrongstatus { - log.Warnf("wrong executor task status: %s, rt status: %s", et.Status.Phase, rt.Status) + log.Warnf("wrong executor task %q status: %q, rt status: %q", et.ID, et.Status.Phase, rt.Status) return nil } @@ -742,6 +755,7 @@ func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) { } func (s *Runservice) executorTasksCleaner(ctx context.Context) error { + // TODO(sgotti) use paged List resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { return err @@ -855,6 +869,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { } defer func() { _ = m.Unlock(ctx) }() + // TODO(sgotti) use paged List resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { return err diff --git a/internal/services/runservice/scheduler_test.go b/internal/services/runservice/scheduler_test.go index 2d1164e..35a0c12 100644 --- a/internal/services/runservice/scheduler_test.go +++ b/internal/services/runservice/scheduler_test.go @@ -686,7 +686,7 @@ func TestChooseExecutor(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - e := chooseExecutor(tt.executors, tt.rct) + e := chooseExecutor(tt.executors, map[string]int{}, tt.rct) if e == nil && tt.out == nil { return } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 35ae086..d0c472f 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -211,6 +211,7 @@ func GetExecutor(ctx context.Context, e *etcd.Store, executorID string) (*types. } func GetExecutors(ctx context.Context, e *etcd.Store) ([]*types.Executor, error) { + // TODO(sgotti) use paged List resp, err := e.List(ctx, common.EtcdExecutorsDir, "", 0) if err != nil { return nil, err @@ -298,7 +299,28 @@ func DeleteExecutorTask(ctx context.Context, e *etcd.Store, etID string) error { return e.Delete(ctx, common.EtcdTaskKey(etID)) } -func GetExecutorTasks(ctx context.Context, e *etcd.Store, executorID string) ([]*types.ExecutorTask, error) { +func GetExecutorTasksCountByExecutor(ctx context.Context, e *etcd.Store) (map[string]int, error) { + // TODO(sgotti) use paged List + resp, err := e.List(ctx, common.EtcdTasksDir, "", 0) + if err != nil { + return nil, err + } + + count := map[string]int{} + + for _, kv := range resp.Kvs { + var et *types.ExecutorTask + if err := json.Unmarshal(kv.Value, &et); err != nil { + return nil, err + } + count[et.Spec.ExecutorID] = count[et.Spec.ExecutorID] + 1 + } + + return count, nil +} + +func GetExecutorTasksForExecutor(ctx context.Context, e *etcd.Store, executorID string) ([]*types.ExecutorTask, error) { + // TODO(sgotti) use paged List resp, err := e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { return nil, err @@ -478,6 +500,7 @@ func DeleteRun(ctx context.Context, e *etcd.Store, runID string) error { } func GetRuns(ctx context.Context, e *etcd.Store) ([]*types.Run, error) { + // TODO(sgotti) use paged List resp, err := e.List(ctx, common.EtcdRunsDir, "", 0) if err != nil { return nil, err