runservice: minimize scheduling of tasks that will be queued by the executor
Since the executor only periodically updates its state we could end up scheduling much more tasks than the executor ActiveTasksLimit. This will happen in the case of many parallel tasks that can all start at the same time. To avoid this also considere the executor tasks saved in etcd that represent the real view of scheduled tasks.
This commit is contained in:
parent
e8fdb8dfae
commit
145c87b4c0
@ -615,7 +615,7 @@ func (h *ActionHandler) GetExecutorTask(ctx context.Context, etID string) (*type
|
||||
}
|
||||
|
||||
func (h *ActionHandler) GetExecutorTasks(ctx context.Context, executorID string) ([]*types.ExecutorTask, error) {
|
||||
ets, err := store.GetExecutorTasks(ctx, h.e, executorID)
|
||||
ets, err := store.GetExecutorTasksForExecutor(ctx, h.e, executorID)
|
||||
if err != nil && err != etcd.ErrKeyNotFound {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -290,10 +290,16 @@ func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTas
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return chooseExecutor(executors, rct), nil
|
||||
// TODO(sgotti) find a way to avoid retrieving this for every chooseExecutor
|
||||
// invocation (i.e. use an etcd watcher to keep this value updated)
|
||||
executorTasksCount, err := store.GetExecutorTasksCountByExecutor(ctx, s.e)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return chooseExecutor(executors, executorTasksCount, rct), nil
|
||||
}
|
||||
|
||||
func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *types.Executor {
|
||||
func chooseExecutor(executors []*types.Executor, executorTasksCount map[string]int, rct *types.RunConfigTask) *types.Executor {
|
||||
requiresPrivilegedContainers := false
|
||||
for _, c := range rct.Runtime.Containers {
|
||||
if c.Privileged {
|
||||
@ -326,7 +332,14 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type
|
||||
}
|
||||
|
||||
if e.ActiveTasksLimit != 0 {
|
||||
if e.ActiveTasks >= e.ActiveTasksLimit {
|
||||
// will be 0 when executorTasksCount[e.ID] doesn't exist
|
||||
activeTasks := executorTasksCount[e.ID]
|
||||
if e.ActiveTasks > activeTasks {
|
||||
activeTasks = e.ActiveTasks
|
||||
}
|
||||
// calculate the active tasks by the max between the current scheduled
|
||||
// tasks in the store and the executor reported tasks
|
||||
if activeTasks >= e.ActiveTasksLimit {
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -674,7 +687,7 @@ func (s *Runservice) updateRunTaskStatus(ctx context.Context, et *types.Executor
|
||||
}
|
||||
}
|
||||
if wrongstatus {
|
||||
log.Warnf("wrong executor task status: %s, rt status: %s", et.Status.Phase, rt.Status)
|
||||
log.Warnf("wrong executor task %q status: %q, rt status: %q", et.ID, et.Status.Phase, rt.Status)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -742,6 +755,7 @@ func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (s *Runservice) executorTasksCleaner(ctx context.Context) error {
|
||||
// TODO(sgotti) use paged List
|
||||
resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -855,6 +869,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error {
|
||||
}
|
||||
defer func() { _ = m.Unlock(ctx) }()
|
||||
|
||||
// TODO(sgotti) use paged List
|
||||
resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -686,7 +686,7 @@ func TestChooseExecutor(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
e := chooseExecutor(tt.executors, tt.rct)
|
||||
e := chooseExecutor(tt.executors, map[string]int{}, tt.rct)
|
||||
if e == nil && tt.out == nil {
|
||||
return
|
||||
}
|
||||
|
@ -211,6 +211,7 @@ func GetExecutor(ctx context.Context, e *etcd.Store, executorID string) (*types.
|
||||
}
|
||||
|
||||
func GetExecutors(ctx context.Context, e *etcd.Store) ([]*types.Executor, error) {
|
||||
// TODO(sgotti) use paged List
|
||||
resp, err := e.List(ctx, common.EtcdExecutorsDir, "", 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -298,7 +299,28 @@ func DeleteExecutorTask(ctx context.Context, e *etcd.Store, etID string) error {
|
||||
return e.Delete(ctx, common.EtcdTaskKey(etID))
|
||||
}
|
||||
|
||||
func GetExecutorTasks(ctx context.Context, e *etcd.Store, executorID string) ([]*types.ExecutorTask, error) {
|
||||
func GetExecutorTasksCountByExecutor(ctx context.Context, e *etcd.Store) (map[string]int, error) {
|
||||
// TODO(sgotti) use paged List
|
||||
resp, err := e.List(ctx, common.EtcdTasksDir, "", 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
count := map[string]int{}
|
||||
|
||||
for _, kv := range resp.Kvs {
|
||||
var et *types.ExecutorTask
|
||||
if err := json.Unmarshal(kv.Value, &et); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
count[et.Spec.ExecutorID] = count[et.Spec.ExecutorID] + 1
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func GetExecutorTasksForExecutor(ctx context.Context, e *etcd.Store, executorID string) ([]*types.ExecutorTask, error) {
|
||||
// TODO(sgotti) use paged List
|
||||
resp, err := e.List(ctx, common.EtcdTasksDir, "", 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -478,6 +500,7 @@ func DeleteRun(ctx context.Context, e *etcd.Store, runID string) error {
|
||||
}
|
||||
|
||||
func GetRuns(ctx context.Context, e *etcd.Store) ([]*types.Run, error) {
|
||||
// TODO(sgotti) use paged List
|
||||
resp, err := e.List(ctx, common.EtcdRunsDir, "", 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user