runservice: fix/improve executor delete logic
* Don't fail tasks inside the delete executor action, just delete the executor from etcd * The scheduler, when detecting a task without a related executor will mark the task as failed and correctly set end time of the task and its steps.
This commit is contained in:
parent
504cd3d6dc
commit
b81ad4cd8c
@ -554,21 +554,6 @@ func (h *ActionHandler) ApproveRunTask(ctx context.Context, req *RunTaskApproveR
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *ActionHandler) DeleteExecutor(ctx context.Context, executorID string) error {
|
func (h *ActionHandler) DeleteExecutor(ctx context.Context, executorID string) error {
|
||||||
// mark all executor tasks as failed
|
|
||||||
ets, err := store.GetExecutorTasks(ctx, h.e, executorID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, et := range ets {
|
|
||||||
et.Status.Phase = types.ExecutorTaskPhaseFailed
|
|
||||||
et.FailError = "executor deleted"
|
|
||||||
if _, err := store.AtomicPutExecutorTask(ctx, h.e, et); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete the executor
|
|
||||||
if err := store.DeleteExecutor(ctx, h.e, executorID); err != nil {
|
if err := store.DeleteExecutor(ctx, h.e, executorID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -836,7 +836,15 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor
|
|||||||
}
|
}
|
||||||
if executor == nil {
|
if executor == nil {
|
||||||
log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Status.ExecutorID, et.ID)
|
log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Status.ExecutorID, et.ID)
|
||||||
|
et.FailError = "executor deleted"
|
||||||
et.Status.Phase = types.ExecutorTaskPhaseFailed
|
et.Status.Phase = types.ExecutorTaskPhaseFailed
|
||||||
|
et.Status.EndTime = util.TimePtr(time.Now())
|
||||||
|
for _, s := range et.Status.Steps {
|
||||||
|
if s.Phase == types.ExecutorTaskPhaseRunning {
|
||||||
|
s.Phase = types.ExecutorTaskPhaseFailed
|
||||||
|
s.EndTime = util.TimePtr(time.Now())
|
||||||
|
}
|
||||||
|
}
|
||||||
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
|
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1301,7 +1309,6 @@ func (s *Runservice) finishedRunsArchiver(ctx context.Context) error {
|
|||||||
// finishedRunArchiver archives a run if it's finished and all the fetching
|
// finishedRunArchiver archives a run if it's finished and all the fetching
|
||||||
// phases (logs and archives) are marked as finished
|
// phases (logs and archives) are marked as finished
|
||||||
func (s *Runservice) finishedRunArchiver(ctx context.Context, r *types.Run) error {
|
func (s *Runservice) finishedRunArchiver(ctx context.Context, r *types.Run) error {
|
||||||
//log.Debugf("r: %s", util.Dump(r))
|
|
||||||
if !r.Phase.IsFinished() {
|
if !r.Phase.IsFinished() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user