executor: serialize task handling
taskUpdater will be called serially and won't block. It'll execute a goroutine for executing the task and for sending the task state to the scheduler. executeTask will just start task execution, all the logic of choosing if starting a task is moved inside taskUpdater In this way we avoid concurrency issues when handling the same executorTask in parallel
This commit is contained in:
parent
268a2b83ea
commit
88dbca15a3
@ -724,32 +724,7 @@ func (e *Executor) sendExecutorTaskStatus(ctx context.Context, et *types.Executo
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *Executor) stopTask(ctx context.Context, et *types.ExecutorTask) {
|
||||
if rt, ok := e.runningTasks.get(et.ID); ok {
|
||||
rt.Lock()
|
||||
defer rt.Unlock()
|
||||
if rt.et.Status.Phase.IsFinished() {
|
||||
return
|
||||
}
|
||||
if rt.pod != nil {
|
||||
if err := rt.pod.Stop(ctx); err != nil {
|
||||
log.Errorf("err: %+v", err)
|
||||
return
|
||||
}
|
||||
if rt.et.Status.Phase == types.ExecutorTaskPhaseNotStarted {
|
||||
rt.et.Status.Phase = types.ExecutorTaskPhaseCancelled
|
||||
} else {
|
||||
rt.et.Status.Phase = types.ExecutorTaskPhaseStopped
|
||||
}
|
||||
if err := e.sendExecutorTaskStatus(ctx, et); err != nil {
|
||||
log.Errorf("err: %+v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) {
|
||||
func (e *Executor) executeTask(ctx context.Context, rt *runningTask) {
|
||||
// * save in local state that we have a running task
|
||||
// * start the pod
|
||||
// * then update the executortask status to in-progress
|
||||
@ -757,37 +732,16 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) {
|
||||
// In this way we are sure that the pod cleaner will only remove pod that don't
|
||||
// have an in progress running task
|
||||
|
||||
if et.Status.Phase != types.ExecutorTaskPhaseNotStarted {
|
||||
log.Debugf("task phase is not \"not started\"")
|
||||
return
|
||||
}
|
||||
|
||||
activeTasks := e.runningTasks.len()
|
||||
// don't start task if we have reached the active tasks limit
|
||||
// they will be executed later
|
||||
if activeTasks > e.c.ActiveTasksLimit {
|
||||
return
|
||||
}
|
||||
|
||||
rt := &runningTask{
|
||||
et: et,
|
||||
executing: true,
|
||||
}
|
||||
|
||||
rt.Lock()
|
||||
|
||||
if !e.runningTasks.addIfNotExists(et.ID, rt) {
|
||||
log.Debugf("task %s already running", et.ID)
|
||||
rt.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
rt.Lock()
|
||||
rt.executing = false
|
||||
rt.Unlock()
|
||||
}()
|
||||
|
||||
et := rt.et
|
||||
|
||||
et.Status.Phase = types.ExecutorTaskPhaseRunning
|
||||
et.Status.StartTime = util.TimeP(time.Now())
|
||||
et.Status.SetupStep.Phase = types.ExecutorTaskPhaseRunning
|
||||
@ -1151,7 +1105,7 @@ func (e *Executor) tasksUpdater(ctx context.Context) error {
|
||||
}
|
||||
log.Debugf("ets: %v", util.Dump(ets))
|
||||
for _, et := range ets {
|
||||
go e.taskUpdater(ctx, et)
|
||||
e.taskUpdater(ctx, et)
|
||||
}
|
||||
|
||||
// remove runningTasks not existing in the runservice
|
||||
@ -1175,31 +1129,73 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
|
||||
return
|
||||
}
|
||||
|
||||
if et.Spec.Stop {
|
||||
e.stopTask(ctx, et)
|
||||
}
|
||||
rt, _ := e.runningTasks.get(et.ID)
|
||||
if rt != nil {
|
||||
rt.Lock()
|
||||
// update running task Spec.Stop value only when there's a transitions from false to true,
|
||||
// other spec values cannot change once the task has been scheduled
|
||||
if !rt.et.Spec.Stop && et.Spec.Stop {
|
||||
rt.et.Spec.Stop = et.Spec.Stop
|
||||
|
||||
if et.Status.Phase == types.ExecutorTaskPhaseNotStarted {
|
||||
e.executeTask(ctx, et)
|
||||
}
|
||||
|
||||
if et.Status.Phase == types.ExecutorTaskPhaseRunning {
|
||||
_, ok := e.runningTasks.get(et.ID)
|
||||
if !ok {
|
||||
log.Infof("marking executor task %s as failed since there's no running task", et.ID)
|
||||
et.Status.Phase = types.ExecutorTaskPhaseFailed
|
||||
et.Status.EndTime = util.TimeP(time.Now())
|
||||
// mark in progress step as failed too
|
||||
for _, s := range et.Status.Steps {
|
||||
if s.Phase == types.ExecutorTaskPhaseRunning {
|
||||
s.Phase = types.ExecutorTaskPhaseFailed
|
||||
s.EndTime = util.TimeP(time.Now())
|
||||
if !rt.et.Status.Phase.IsFinished() && rt.pod != nil {
|
||||
if err := rt.pod.Stop(ctx); err != nil {
|
||||
log.Errorf("err: %+v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
rt.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// rt == nil
|
||||
|
||||
// only send cancelled phase when the executor task isn't in running tasks and is not started
|
||||
if et.Spec.Stop && et.Status.Phase == types.ExecutorTaskPhaseNotStarted {
|
||||
et.Status.Phase = types.ExecutorTaskPhaseCancelled
|
||||
go func() {
|
||||
if err := e.sendExecutorTaskStatus(ctx, et); err != nil {
|
||||
log.Errorf("err: %+v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if !et.Spec.Stop && et.Status.Phase == types.ExecutorTaskPhaseRunning {
|
||||
log.Infof("marking executor task %s as failed since there's no running task", et.ID)
|
||||
et.Status.Phase = types.ExecutorTaskPhaseFailed
|
||||
et.Status.EndTime = util.TimeP(time.Now())
|
||||
// mark in progress step as failed too
|
||||
for _, s := range et.Status.Steps {
|
||||
if s.Phase == types.ExecutorTaskPhaseRunning {
|
||||
s.Phase = types.ExecutorTaskPhaseFailed
|
||||
s.EndTime = util.TimeP(time.Now())
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
if err := e.sendExecutorTaskStatus(ctx, et); err != nil {
|
||||
log.Errorf("err: %+v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if !et.Spec.Stop && et.Status.Phase == types.ExecutorTaskPhaseNotStarted {
|
||||
activeTasks := e.runningTasks.len()
|
||||
// don't start task if we have reached the active tasks limit (they will be retried
|
||||
// on next taskUpdater calls)
|
||||
if activeTasks > e.c.ActiveTasksLimit {
|
||||
return
|
||||
}
|
||||
rt := &runningTask{
|
||||
et: et,
|
||||
executing: true,
|
||||
}
|
||||
|
||||
if !e.runningTasks.addIfNotExists(et.ID, rt) {
|
||||
log.Warnf("task %s already running, this shouldn't happen", et.ID)
|
||||
return
|
||||
}
|
||||
|
||||
go e.executeTask(ctx, rt)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1309,7 +1305,7 @@ func (r *runningTasks) ids() []string {
|
||||
|
||||
func (e *Executor) handleTasks(ctx context.Context, c <-chan *types.ExecutorTask) {
|
||||
for et := range c {
|
||||
go e.executeTask(ctx, et)
|
||||
e.taskUpdater(ctx, et)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user