executor: fix task status update when runservice is unavailable

* don't remove the runningTask when executeTask finishes but just mark the
runningTask a not executing
* add a loop to periodically update executorTask status and remove the
runningTask if not executing and status update was successful
* remove runningTask when it disappears from the runservice
This commit is contained in:
Simone Gotti 2019-06-13 12:39:34 +02:00
parent d6feb10e8f
commit 57446f7dcd

View File

@ -761,7 +761,8 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) {
}
rt := &runningTask{
et: et,
et: et,
executing: true,
}
rt.Lock()
@ -771,7 +772,11 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) {
return
}
defer e.runningTasks.delete(et.ID)
defer func() {
rt.Lock()
rt.executing = false
rt.Unlock()
}()
et.Status.Phase = types.ExecutorTaskPhaseRunning
et.Status.StartTime = util.TimePtr(time.Now())
@ -1032,7 +1037,7 @@ func (e *Executor) podsCleaner(ctx context.Context) error {
func (e *Executor) executorStatusSenderLoop(ctx context.Context) {
for {
log.Debugf("executorStatusSender")
log.Debugf("executorStatusSenderLoop")
if err := e.sendExecutorStatus(ctx); err != nil {
log.Errorf("err: %+v", err)
@ -1048,6 +1053,40 @@ func (e *Executor) executorStatusSenderLoop(ctx context.Context) {
}
}
func (e *Executor) executorTasksStatusSenderLoop(ctx context.Context) {
for {
log.Debugf("executorTasksStatusSenderLoop")
for _, rtID := range e.runningTasks.ids() {
rt, ok := e.runningTasks.get(rtID)
if !ok {
continue
}
rt.Lock()
if err := e.sendExecutorTaskStatus(ctx, rt.et); err != nil {
log.Errorf("err: %+v", err)
rt.Unlock()
continue
}
// remove running task if send was successful and it's not executing
if !rt.executing {
e.runningTasks.delete(rtID)
}
rt.Unlock()
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(2 * time.Second)
}
}
func (e *Executor) tasksUpdaterLoop(ctx context.Context) {
for {
log.Debugf("tasksUpdater")
@ -1080,6 +1119,18 @@ func (e *Executor) tasksUpdater(ctx context.Context) error {
go e.taskUpdater(ctx, et)
}
// remove runningTasks not existing in the runservice
etIDsMap := map[string]struct{}{}
for _, et := range ets {
etIDsMap[et.ID] = struct{}{}
}
for _, rtID := range e.runningTasks.ids() {
if _, ok := etIDsMap[rtID]; !ok {
e.runningTasks.delete(rtID)
}
}
return nil
}
@ -1176,6 +1227,8 @@ type runningTask struct {
et *types.ExecutorTask
pod driver.Pod
executing bool
}
func (r *runningTasks) get(rtID string) (*runningTask, bool) {
@ -1213,6 +1266,16 @@ func (r *runningTasks) len() int {
return len(r.tasks)
}
func (r *runningTasks) ids() []string {
ids := []string{}
r.m.Lock()
defer r.m.Unlock()
for id := range r.tasks {
ids = append(ids, id)
}
return ids
}
func (e *Executor) handleTasks(ctx context.Context, c <-chan *types.ExecutorTask) {
for et := range c {
go e.executeTask(ctx, et)
@ -1337,6 +1400,7 @@ func (e *Executor) Run(ctx context.Context) error {
apirouter.Handle("/executor/archives", archivesHandler).Methods("GET")
go e.executorStatusSenderLoop(ctx)
go e.executorTasksStatusSenderLoop(ctx)
go e.podsCleanerLoop(ctx)
go e.tasksUpdaterLoop(ctx)
go e.tasksDataCleanerLoop(ctx)