runservice: stop all running executor tasks when run is marked to stop

This commit is contained in:
Simone Gotti 2019-04-11 23:44:55 +02:00
parent c300a37d09
commit 991fcc59de
2 changed files with 43 additions and 16 deletions

View File

@ -672,11 +672,11 @@ func (e *Executor) executorStatusSenderLoop(ctx context.Context) {
}
}
func (e *Executor) tasksCleanerLoop(ctx context.Context) {
func (e *Executor) tasksUpdaterLoop(ctx context.Context) {
for {
log.Debugf("tasksCleaner")
log.Debugf("tasksUpdater")
if err := e.tasksCleaner(ctx); err != nil {
if err := e.tasksUpdater(ctx); err != nil {
log.Errorf("err: %+v", err)
}
@ -690,7 +690,10 @@ func (e *Executor) tasksCleanerLoop(ctx context.Context) {
}
}
func (e *Executor) tasksCleaner(ctx context.Context) error {
// taskUpdater fetches the executor tasks from the scheduler and handles them
// this is useful to catch up when some tasks submissions from the scheduler to the executor
// APIs fails
func (e *Executor) tasksUpdater(ctx context.Context) error {
ets, _, err := e.runserviceClient.GetExecutorTasks(ctx, e.id)
if err != nil {
log.Warnf("err: %v", err)
@ -698,13 +701,13 @@ func (e *Executor) tasksCleaner(ctx context.Context) error {
}
log.Debugf("ets: %v", util.Dump(ets))
for _, et := range ets {
go e.cleanTask(ctx, et)
go e.taskUpdater(ctx, et)
}
return nil
}
func (e *Executor) cleanTask(ctx context.Context, et *types.ExecutorTask) {
func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
log.Debugf("et: %v", util.Dump(et))
if et.Status.ExecutorID != e.id {
return
@ -947,7 +950,7 @@ func (e *Executor) Run(ctx context.Context) error {
go e.executorStatusSenderLoop(ctx)
go e.podsCleanerLoop(ctx)
go e.tasksCleanerLoop(ctx)
go e.tasksUpdaterLoop(ctx)
go e.tasksDataCleanerLoop(ctx)
go e.handleTasks(ctx, ch)

View File

@ -62,25 +62,34 @@ func mergeEnv(dest, src map[string]string) {
}
}
func (s *Scheduler) runHasActiveTasks(ctx context.Context, runID string) (bool, error) {
func (s *Scheduler) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) {
// the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing
// updated from the executor so it could be in a NotStarted state but have an
// executor tasks scheduled and running
ets, err := store.GetExecutorTasksForRun(ctx, s.e, runID)
if err != nil {
return false, err
return nil, err
}
activeTasks := false
activeTasks := []*types.ExecutorTask{}
for _, et := range ets {
if !et.Status.Phase.IsFinished() {
activeTasks = true
activeTasks = append(activeTasks, et)
}
}
return activeTasks, nil
}
func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) {
activeTasks, err := s.runActiveExecutorTasks(ctx, runID)
if err != nil {
return false, err
}
return len(activeTasks) > 0, nil
}
func advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error {
log.Debugf("run: %s", util.Dump(r))
log.Debugf("rc: %s", util.Dump(rc))
@ -184,8 +193,6 @@ func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types.
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err
}
// try to send executor task to executor, if this fails the executor will
// periodically fetch the executortask anyway
if err := s.sendExecutorTask(ctx, et); err != nil {
return err
}
@ -267,6 +274,8 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types
return et
}
// sendExecutorTask sends executor task to executor, if this fails the executor
// will periodically fetch the executortask anyway
func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error {
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound {
@ -361,7 +370,7 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run
prevPhase := r.Phase
prevResult := r.Result
hasActiveTasks, err := s.runHasActiveTasks(ctx, r.ID)
hasActiveTasks, err := s.runHasActiveExecutorTasks(ctx, r.ID)
if err != nil {
return err
}
@ -385,6 +394,23 @@ func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.Run
return err
}
// if the run is set to stop, stop all tasks
if r.Stop {
activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID)
if err != nil {
return err
}
for _, et := range activeExecutorTasks {
et.Stop = true
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err
}
if err := s.sendExecutorTask(ctx, et); err != nil {
return err
}
}
}
if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning {
if err := advanceRunTasks(ctx, r, rc); err != nil {
return err
@ -658,8 +684,6 @@ func (s *Scheduler) executorTaskCleaner(ctx context.Context, et *types.ExecutorT
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err
}
// try to send executor task to executor, if this fails the executor will
// periodically fetch the executortask anyway
if err := s.sendExecutorTask(ctx, et); err != nil {
log.Errorf("err: %+v", err)
return err