diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index 0bfa469..2ce957f 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -724,32 +724,7 @@ func (e *Executor) sendExecutorTaskStatus(ctx context.Context, et *types.Executo return err } -func (e *Executor) stopTask(ctx context.Context, et *types.ExecutorTask) { - if rt, ok := e.runningTasks.get(et.ID); ok { - rt.Lock() - defer rt.Unlock() - if rt.et.Status.Phase.IsFinished() { - return - } - if rt.pod != nil { - if err := rt.pod.Stop(ctx); err != nil { - log.Errorf("err: %+v", err) - return - } - if rt.et.Status.Phase == types.ExecutorTaskPhaseNotStarted { - rt.et.Status.Phase = types.ExecutorTaskPhaseCancelled - } else { - rt.et.Status.Phase = types.ExecutorTaskPhaseStopped - } - if err := e.sendExecutorTaskStatus(ctx, et); err != nil { - log.Errorf("err: %+v", err) - return - } - } - } -} - -func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) { +func (e *Executor) executeTask(ctx context.Context, rt *runningTask) { // * save in local state that we have a running task // * start the pod // * then update the executortask status to in-progress @@ -757,37 +732,16 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) { // In this way we are sure that the pod cleaner will only remove pod that don't // have an in progress running task - if et.Status.Phase != types.ExecutorTaskPhaseNotStarted { - log.Debugf("task phase is not \"not started\"") - return - } - - activeTasks := e.runningTasks.len() - // don't start task if we have reached the active tasks limit - // they will be executed later - if activeTasks > e.c.ActiveTasksLimit { - return - } - - rt := &runningTask{ - et: et, - executing: true, - } - rt.Lock() - if !e.runningTasks.addIfNotExists(et.ID, rt) { - log.Debugf("task %s already running", et.ID) - rt.Unlock() - return - } - defer func() { rt.Lock() rt.executing = false rt.Unlock() }() + et := rt.et + et.Status.Phase = types.ExecutorTaskPhaseRunning et.Status.StartTime = util.TimeP(time.Now()) et.Status.SetupStep.Phase = types.ExecutorTaskPhaseRunning @@ -1151,7 +1105,7 @@ func (e *Executor) tasksUpdater(ctx context.Context) error { } log.Debugf("ets: %v", util.Dump(ets)) for _, et := range ets { - go e.taskUpdater(ctx, et) + e.taskUpdater(ctx, et) } // remove runningTasks not existing in the runservice @@ -1175,31 +1129,73 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { return } - if et.Spec.Stop { - e.stopTask(ctx, et) - } + rt, _ := e.runningTasks.get(et.ID) + if rt != nil { + rt.Lock() + // update running task Spec.Stop value only when there's a transitions from false to true, + // other spec values cannot change once the task has been scheduled + if !rt.et.Spec.Stop && et.Spec.Stop { + rt.et.Spec.Stop = et.Spec.Stop - if et.Status.Phase == types.ExecutorTaskPhaseNotStarted { - e.executeTask(ctx, et) - } - - if et.Status.Phase == types.ExecutorTaskPhaseRunning { - _, ok := e.runningTasks.get(et.ID) - if !ok { - log.Infof("marking executor task %s as failed since there's no running task", et.ID) - et.Status.Phase = types.ExecutorTaskPhaseFailed - et.Status.EndTime = util.TimeP(time.Now()) - // mark in progress step as failed too - for _, s := range et.Status.Steps { - if s.Phase == types.ExecutorTaskPhaseRunning { - s.Phase = types.ExecutorTaskPhaseFailed - s.EndTime = util.TimeP(time.Now()) + if !rt.et.Status.Phase.IsFinished() && rt.pod != nil { + if err := rt.pod.Stop(ctx); err != nil { + log.Errorf("err: %+v", err) } } + } + rt.Unlock() + + return + } + + // rt == nil + + // only send cancelled phase when the executor task isn't in running tasks and is not started + if et.Spec.Stop && et.Status.Phase == types.ExecutorTaskPhaseNotStarted { + et.Status.Phase = types.ExecutorTaskPhaseCancelled + go func() { if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) } + }() + } + + if !et.Spec.Stop && et.Status.Phase == types.ExecutorTaskPhaseRunning { + log.Infof("marking executor task %s as failed since there's no running task", et.ID) + et.Status.Phase = types.ExecutorTaskPhaseFailed + et.Status.EndTime = util.TimeP(time.Now()) + // mark in progress step as failed too + for _, s := range et.Status.Steps { + if s.Phase == types.ExecutorTaskPhaseRunning { + s.Phase = types.ExecutorTaskPhaseFailed + s.EndTime = util.TimeP(time.Now()) + } } + go func() { + if err := e.sendExecutorTaskStatus(ctx, et); err != nil { + log.Errorf("err: %+v", err) + } + }() + } + + if !et.Spec.Stop && et.Status.Phase == types.ExecutorTaskPhaseNotStarted { + activeTasks := e.runningTasks.len() + // don't start task if we have reached the active tasks limit (they will be retried + // on next taskUpdater calls) + if activeTasks > e.c.ActiveTasksLimit { + return + } + rt := &runningTask{ + et: et, + executing: true, + } + + if !e.runningTasks.addIfNotExists(et.ID, rt) { + log.Warnf("task %s already running, this shouldn't happen", et.ID) + return + } + + go e.executeTask(ctx, rt) } } @@ -1309,7 +1305,7 @@ func (r *runningTasks) ids() []string { func (e *Executor) handleTasks(ctx context.Context, c <-chan *types.ExecutorTask) { for et := range c { - go e.executeTask(ctx, et) + e.taskUpdater(ctx, et) } }