Merge pull request #224 from sgotti/executor_use_cancellable_context_in_executetask
executor: use cancellable context in executetask
This commit is contained in:
commit
4da7c23bb8
|
@ -614,7 +614,11 @@ func (dp *DockerPod) Exec(ctx context.Context, execConfig *ExecConfig) (Containe
|
||||||
|
|
||||||
func (e *DockerContainerExec) Wait(ctx context.Context) (int, error) {
|
func (e *DockerContainerExec) Wait(ctx context.Context) (int, error) {
|
||||||
// ignore error, we'll use the exit code of the exec
|
// ignore error, we'll use the exit code of the exec
|
||||||
<-e.endCh
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return 0, ctx.Err()
|
||||||
|
case <-e.endCh:
|
||||||
|
}
|
||||||
|
|
||||||
var exitCode int
|
var exitCode int
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -724,7 +724,7 @@ func (e *Executor) sendExecutorTaskStatus(ctx context.Context, et *types.Executo
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Executor) executeTask(ctx context.Context, rt *runningTask) {
|
func (e *Executor) executeTask(rt *runningTask) {
|
||||||
// * save in local state that we have a running task
|
// * save in local state that we have a running task
|
||||||
// * start the pod
|
// * start the pod
|
||||||
// * then update the executortask status to in-progress
|
// * then update the executortask status to in-progress
|
||||||
|
@ -733,10 +733,21 @@ func (e *Executor) executeTask(ctx context.Context, rt *runningTask) {
|
||||||
// have an in progress running task
|
// have an in progress running task
|
||||||
|
|
||||||
rt.Lock()
|
rt.Lock()
|
||||||
|
ctx := rt.ctx
|
||||||
|
|
||||||
|
// wait for context to be done and then stop the pod if running
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
if rt.pod != nil {
|
||||||
|
if err := rt.pod.Stop(context.Background()); err != nil {
|
||||||
|
log.Errorf("error stopping the pod: %+v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
rt.Lock()
|
rt.Lock()
|
||||||
rt.executing = false
|
rt.cancel()
|
||||||
rt.Unlock()
|
rt.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -1070,9 +1081,12 @@ func (e *Executor) executorTasksStatusSenderLoop(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove running task if send was successful and it's not executing
|
// remove running task if send was successful and it's not executing
|
||||||
if !rt.executing {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
e.runningTasks.delete(rtID)
|
e.runningTasks.delete(rtID)
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
rt.Unlock()
|
rt.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1116,14 +1130,18 @@ func (e *Executor) tasksUpdater(ctx context.Context) error {
|
||||||
e.taskUpdater(ctx, et)
|
e.taskUpdater(ctx, et)
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove runningTasks not existing in the runservice
|
// stop and remove runningTasks not existing in the runservice
|
||||||
etIDsMap := map[string]struct{}{}
|
etIDsMap := map[string]struct{}{}
|
||||||
for _, et := range ets {
|
for _, et := range ets {
|
||||||
etIDsMap[et.ID] = struct{}{}
|
etIDsMap[et.ID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, rtID := range e.runningTasks.ids() {
|
for _, rtID := range e.runningTasks.ids() {
|
||||||
if _, ok := etIDsMap[rtID]; !ok {
|
if _, ok := etIDsMap[rtID]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if rt, ok := e.runningTasks.get(rtID); ok {
|
||||||
|
rt.cancel()
|
||||||
e.runningTasks.delete(rtID)
|
e.runningTasks.delete(rtID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1145,11 +1163,8 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
|
||||||
if !rt.et.Spec.Stop && et.Spec.Stop {
|
if !rt.et.Spec.Stop && et.Spec.Stop {
|
||||||
rt.et.Spec.Stop = et.Spec.Stop
|
rt.et.Spec.Stop = et.Spec.Stop
|
||||||
|
|
||||||
if !rt.et.Status.Phase.IsFinished() && rt.pod != nil {
|
// cancel the running task
|
||||||
if err := rt.pod.Stop(ctx); err != nil {
|
rt.cancel()
|
||||||
log.Errorf("err: %+v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
rt.Unlock()
|
rt.Unlock()
|
||||||
|
|
||||||
|
@ -1193,9 +1208,11 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
|
||||||
if activeTasks > e.c.ActiveTasksLimit {
|
if activeTasks > e.c.ActiveTasksLimit {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
rtCtx, rtCancel := context.WithCancel(ctx)
|
||||||
rt := &runningTask{
|
rt := &runningTask{
|
||||||
et: et,
|
et: et,
|
||||||
executing: true,
|
ctx: rtCtx,
|
||||||
|
cancel: rtCancel,
|
||||||
}
|
}
|
||||||
|
|
||||||
if !e.runningTasks.addIfNotExists(et.ID, rt) {
|
if !e.runningTasks.addIfNotExists(et.ID, rt) {
|
||||||
|
@ -1203,7 +1220,7 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
go e.executeTask(ctx, rt)
|
go e.executeTask(rt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1266,10 +1283,11 @@ type runningTasks struct {
|
||||||
type runningTask struct {
|
type runningTask struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
|
||||||
et *types.ExecutorTask
|
et *types.ExecutorTask
|
||||||
pod driver.Pod
|
pod driver.Pod
|
||||||
|
|
||||||
executing bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *runningTasks) get(rtID string) (*runningTask, bool) {
|
func (r *runningTasks) get(rtID string) (*runningTask, bool) {
|
||||||
|
|
Loading…
Reference in New Issue