From 991fcc59deaabde8ab246d0854371f3e7b93aaa8 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 11 Apr 2019 23:44:55 +0200 Subject: [PATCH] runservice: stop all running executor tasks when run is marked to stop --- .../services/runservice/executor/executor.go | 17 ++++---- .../runservice/scheduler/scheduler.go | 42 +++++++++++++++---- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/internal/services/runservice/executor/executor.go b/internal/services/runservice/executor/executor.go index f572a02..a5a601e 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/runservice/executor/executor.go @@ -672,11 +672,11 @@ func (e *Executor) executorStatusSenderLoop(ctx context.Context) { } } -func (e *Executor) tasksCleanerLoop(ctx context.Context) { +func (e *Executor) tasksUpdaterLoop(ctx context.Context) { for { - log.Debugf("tasksCleaner") + log.Debugf("tasksUpdater") - if err := e.tasksCleaner(ctx); err != nil { + if err := e.tasksUpdater(ctx); err != nil { log.Errorf("err: %+v", err) } @@ -690,7 +690,10 @@ func (e *Executor) tasksCleanerLoop(ctx context.Context) { } } -func (e *Executor) tasksCleaner(ctx context.Context) error { +// taskUpdater fetches the executor tasks from the scheduler and handles them +// this is useful to catch up when some tasks submissions from the scheduler to the executor +// APIs fails +func (e *Executor) tasksUpdater(ctx context.Context) error { ets, _, err := e.runserviceClient.GetExecutorTasks(ctx, e.id) if err != nil { log.Warnf("err: %v", err) @@ -698,13 +701,13 @@ func (e *Executor) tasksCleaner(ctx context.Context) error { } log.Debugf("ets: %v", util.Dump(ets)) for _, et := range ets { - go e.cleanTask(ctx, et) + go e.taskUpdater(ctx, et) } return nil } -func (e *Executor) cleanTask(ctx context.Context, et *types.ExecutorTask) { +func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { log.Debugf("et: %v", util.Dump(et)) if et.Status.ExecutorID != e.id { return @@ -947,7 +950,7 @@ func (e *Executor) Run(ctx context.Context) error { go e.executorStatusSenderLoop(ctx) go e.podsCleanerLoop(ctx) - go e.tasksCleanerLoop(ctx) + go e.tasksUpdaterLoop(ctx) go e.tasksDataCleanerLoop(ctx) go e.handleTasks(ctx, ch) diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index c3b924d..f66393f 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -62,25 +62,34 @@ func mergeEnv(dest, src map[string]string) { } } -func (s *Scheduler) runHasActiveTasks(ctx context.Context, runID string) (bool, error) { +func (s *Scheduler) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { // the real source of active tasks is the number of executor tasks in etcd // we can't rely on RunTask.Status since it's only updated when receiveing // updated from the executor so it could be in a NotStarted state but have an // executor tasks scheduled and running ets, err := store.GetExecutorTasksForRun(ctx, s.e, runID) if err != nil { - return false, err + return nil, err } - activeTasks := false + activeTasks := []*types.ExecutorTask{} for _, et := range ets { if !et.Status.Phase.IsFinished() { - activeTasks = true + activeTasks = append(activeTasks, et) } } return activeTasks, nil } +func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) { + activeTasks, err := s.runActiveExecutorTasks(ctx, runID) + if err != nil { + return false, err + } + + return len(activeTasks) > 0, nil +} + func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error { log.Debugf("run: %s", util.Dump(r)) log.Debugf("rc: %s", util.Dump(rc)) @@ -184,8 +193,6 @@ func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types. if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { return err } - // try to send executor task to executor, if this fails the executor will - // periodically fetch the executortask anyway if err := s.sendExecutorTask(ctx, et); err != nil { return err } @@ -267,6 +274,8 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types return et } +// sendExecutorTask sends executor task to executor, if this fails the executor +// will periodically fetch the executortask anyway func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { @@ -361,7 +370,7 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run prevPhase := r.Phase prevResult := r.Result - hasActiveTasks, err := s.runHasActiveTasks(ctx, r.ID) + hasActiveTasks, err := s.runHasActiveExecutorTasks(ctx, r.ID) if err != nil { return err } @@ -385,6 +394,23 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run return err } + // if the run is set to stop, stop all tasks + if r.Stop { + activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID) + if err != nil { + return err + } + for _, et := range activeExecutorTasks { + et.Stop = true + if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { + return err + } + if err := s.sendExecutorTask(ctx, et); err != nil { + return err + } + } + } + if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { if err := advanceRunTasks(ctx, r, rc); err != nil { return err @@ -658,8 +684,6 @@ func (s *Scheduler) executorTaskCleaner(ctx context.Context, et *types.ExecutorT if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { return err } - // try to send executor task to executor, if this fails the executor will - // periodically fetch the executortask anyway if err := s.sendExecutorTask(ctx, et); err != nil { log.Errorf("err: %+v", err) return err