Merge pull request #218 from sgotti/runservice_minimize_scheduling_of_tasks_that_will_be_queued_by_the_executor

runservice: minimize scheduling of tasks that will be queued by the executor
This commit is contained in:
Simone Gotti 2020-02-27 16:08:48 +01:00 committed by GitHub
commit 268a2b83ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 7 deletions

View File

@ -615,7 +615,7 @@ func (h *ActionHandler) GetExecutorTask(ctx context.Context, etID string) (*type
}
func (h *ActionHandler) GetExecutorTasks(ctx context.Context, executorID string) ([]*types.ExecutorTask, error) {
ets, err := store.GetExecutorTasks(ctx, h.e, executorID)
ets, err := store.GetExecutorTasksForExecutor(ctx, h.e, executorID)
if err != nil && err != etcd.ErrKeyNotFound {
return nil, err
}

View File

@ -290,10 +290,16 @@ func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTas
if err != nil {
return nil, err
}
return chooseExecutor(executors, rct), nil
// TODO(sgotti) find a way to avoid retrieving this for every chooseExecutor
// invocation (i.e. use an etcd watcher to keep this value updated)
executorTasksCount, err := store.GetExecutorTasksCountByExecutor(ctx, s.e)
if err != nil {
return nil, err
}
return chooseExecutor(executors, executorTasksCount, rct), nil
}
func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *types.Executor {
func chooseExecutor(executors []*types.Executor, executorTasksCount map[string]int, rct *types.RunConfigTask) *types.Executor {
requiresPrivilegedContainers := false
for _, c := range rct.Runtime.Containers {
if c.Privileged {
@ -326,7 +332,14 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type
}
if e.ActiveTasksLimit != 0 {
if e.ActiveTasks >= e.ActiveTasksLimit {
// will be 0 when executorTasksCount[e.ID] doesn't exist
activeTasks := executorTasksCount[e.ID]
if e.ActiveTasks > activeTasks {
activeTasks = e.ActiveTasks
}
// calculate the active tasks by the max between the current scheduled
// tasks in the store and the executor reported tasks
if activeTasks >= e.ActiveTasksLimit {
continue
}
}
@ -674,7 +687,7 @@ func (s *Runservice) updateRunTaskStatus(ctx context.Context, et *types.Executor
}
}
if wrongstatus {
log.Warnf("wrong executor task status: %s, rt status: %s", et.Status.Phase, rt.Status)
log.Warnf("wrong executor task %q status: %q, rt status: %q", et.ID, et.Status.Phase, rt.Status)
return nil
}
@ -742,6 +755,7 @@ func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) {
}
func (s *Runservice) executorTasksCleaner(ctx context.Context) error {
// TODO(sgotti) use paged List
resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0)
if err != nil {
return err
@ -855,6 +869,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error {
}
defer func() { _ = m.Unlock(ctx) }()
// TODO(sgotti) use paged List
resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0)
if err != nil {
return err

View File

@ -686,7 +686,7 @@ func TestChooseExecutor(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := chooseExecutor(tt.executors, tt.rct)
e := chooseExecutor(tt.executors, map[string]int{}, tt.rct)
if e == nil && tt.out == nil {
return
}

View File

@ -211,6 +211,7 @@ func GetExecutor(ctx context.Context, e *etcd.Store, executorID string) (*types.
}
func GetExecutors(ctx context.Context, e *etcd.Store) ([]*types.Executor, error) {
// TODO(sgotti) use paged List
resp, err := e.List(ctx, common.EtcdExecutorsDir, "", 0)
if err != nil {
return nil, err
@ -298,7 +299,28 @@ func DeleteExecutorTask(ctx context.Context, e *etcd.Store, etID string) error {
return e.Delete(ctx, common.EtcdTaskKey(etID))
}
func GetExecutorTasks(ctx context.Context, e *etcd.Store, executorID string) ([]*types.ExecutorTask, error) {
func GetExecutorTasksCountByExecutor(ctx context.Context, e *etcd.Store) (map[string]int, error) {
// TODO(sgotti) use paged List
resp, err := e.List(ctx, common.EtcdTasksDir, "", 0)
if err != nil {
return nil, err
}
count := map[string]int{}
for _, kv := range resp.Kvs {
var et *types.ExecutorTask
if err := json.Unmarshal(kv.Value, &et); err != nil {
return nil, err
}
count[et.Spec.ExecutorID] = count[et.Spec.ExecutorID] + 1
}
return count, nil
}
func GetExecutorTasksForExecutor(ctx context.Context, e *etcd.Store, executorID string) ([]*types.ExecutorTask, error) {
// TODO(sgotti) use paged List
resp, err := e.List(ctx, common.EtcdTasksDir, "", 0)
if err != nil {
return nil, err
@ -478,6 +500,7 @@ func DeleteRun(ctx context.Context, e *etcd.Store, runID string) error {
}
func GetRuns(ctx context.Context, e *etcd.Store) ([]*types.Run, error) {
// TODO(sgotti) use paged List
resp, err := e.List(ctx, common.EtcdRunsDir, "", 0)
if err != nil {
return nil, err