From 751361daeaca48784e2954719afc0f8dfe79d3a9 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 10 Apr 2019 14:48:47 +0200 Subject: [PATCH] runservice: refactor scheduling logic * split functions in sub parts to ease future testing * save run fewer times * rework events logic to considere both run phase and result changes (emit an event on every phase or result change) --- .../runservice/scheduler/command/command.go | 12 +- .../runservice/scheduler/common/events.go | 22 +- .../runservice/scheduler/scheduler.go | 259 +++++++++++------- .../runservice/scheduler/store/store.go | 8 +- 4 files changed, 179 insertions(+), 122 deletions(-) diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/command/command.go index e9d054d..1f548a2 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/command/command.go @@ -84,7 +84,7 @@ func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhase r.Stop = true } - _, err = store.AtomicPutRun(ctx, s.e, r, "", cgt) + _, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt) return err } @@ -112,7 +112,7 @@ func (s *CommandHandler) StopRun(ctx context.Context, req *RunStopRequest) error r.Stop = true } - _, err = store.AtomicPutRun(ctx, s.e, r, "", cgt) + _, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt) return err } @@ -330,7 +330,11 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg return err } - if _, err := store.AtomicPutRun(ctx, s.e, run, common.RunEventTypeQueued, runcgt); err != nil { + runEvent, err := common.NewRunEvent(ctx, s.e, run.ID, run.Phase, run.Result) + if err != nil { + return err + } + if _, err := store.AtomicPutRun(ctx, s.e, run, runEvent, runcgt); err != nil { return err } return nil @@ -432,7 +436,7 @@ func (s *CommandHandler) ApproveRunTask(ctx context.Context, req *RunTaskApprove task.Approved = true task.ApprovalAnnotations = req.ApprovalAnnotations - _, err = store.AtomicPutRun(ctx, s.e, r, "", cgt) + _, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt) return err } diff --git a/internal/services/runservice/scheduler/common/events.go b/internal/services/runservice/scheduler/common/events.go index c0aa1be..858a183 100644 --- a/internal/services/runservice/scheduler/common/events.go +++ b/internal/services/runservice/scheduler/common/events.go @@ -19,28 +19,20 @@ import ( "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/sequence" -) - -type RunEventType string - -const ( - RunEventTypeQueued RunEventType = "queued" - RunEventTypeCancelled RunEventType = "cancelled" - RunEventTypeRunning RunEventType = "running" - RunEventTypeSuccess RunEventType = "success" - RunEventTypeFailed RunEventType = "failed" + "github.com/sorintlab/agola/internal/services/runservice/types" ) type RunEvent struct { - Sequence string - EventType RunEventType - RunID string + Sequence string + RunID string + Phase types.RunPhase + Result types.RunResult } -func NewRunEvent(ctx context.Context, e *etcd.Store, runEventType RunEventType, runID string) (*RunEvent, error) { +func NewRunEvent(ctx context.Context, e *etcd.Store, runID string, phase types.RunPhase, result types.RunResult) (*RunEvent, error) { seq, err := sequence.IncSequence(ctx, e, EtcdRunEventSequenceKey) if err != nil { return nil, err } - return &RunEvent{Sequence: seq.String(), EventType: runEventType, RunID: runID}, nil + return &RunEvent{Sequence: seq.String(), RunID: runID, Phase: phase, Result: result}, nil } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 018b07e..4309b7b 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -81,15 +81,10 @@ func (s *Scheduler) runHasActiveTasks(ctx context.Context, runID string) (bool, return activeTasks, nil } -func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error { +func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error { log.Debugf("run: %s", util.Dump(r)) - rc, err := store.LTSGetRunConfig(s.wal, r.ID) - if err != nil { - return errors.Wrapf(err, "cannot get run config %q", r.ID) - } log.Debugf("rc: %s", util.Dump(rc)) - tasksToRun := []*types.RunTask{} // get tasks that can be executed for _, rt := range r.RunTasks { log.Debugf("rt: %s", util.Dump(rt)) @@ -116,6 +111,40 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error { if rct.NeedsApproval && !rt.WaitingApproval && !rt.Approved { rt.WaitingApproval = true } + } + } + + return nil +} + +func (s *Scheduler) getTasksToRun(ctx context.Context, r *types.Run) ([]*types.RunTask, error) { + log.Debugf("run: %s", util.Dump(r)) + rc, err := store.LTSGetRunConfig(s.wal, r.ID) + if err != nil { + return nil, errors.Wrapf(err, "cannot get run config %q", r.ID) + } + log.Debugf("rc: %s", util.Dump(rc)) + + tasksToRun := []*types.RunTask{} + // get tasks that can be executed + for _, rt := range r.RunTasks { + log.Debugf("rt: %s", util.Dump(rt)) + if rt.Skip { + continue + } + if rt.Status != types.RunTaskStatusNotStarted { + continue + } + + rct := rc.Tasks[rt.ID] + parents := runconfig.GetParents(rc.Tasks, rct) + canRun := true + for _, p := range parents { + rp := r.RunTasks[p.ID] + canRun = rp.Status.IsFinished() && rp.ArchivesFetchFinished() + } + + if canRun { // Run only if approved if needed if !rct.NeedsApproval || (rct.NeedsApproval && rt.Approved) { tasksToRun = append(tasksToRun, rt) @@ -123,18 +152,22 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error { } } - // save run since we may have changed some run tasks to waiting approval - if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { - return err - } + return tasksToRun, nil +} - log.Debugf("tasksToRun: %s", util.Dump(tasksToRun)) +func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig, tasks []*types.RunTask) error { + log.Debugf("tasksToRun: %s", util.Dump(tasks)) - for _, rt := range tasksToRun { - et, err := s.genExecutorTask(ctx, r, rt, rc) + for _, rt := range tasks { + executor, err := s.chooseExecutor(ctx) if err != nil { return err } + if executor == nil { + return errors.Errorf("cannot choose an executor") + } + + et := s.genExecutorTask(ctx, r, rt, rc, executor) log.Debugf("et: %s", util.Dump(et)) // check that the executorTask wasn't already scheduled @@ -173,15 +206,7 @@ func (s *Scheduler) chooseExecutor(ctx context.Context) (*types.Executor, error) return nil, nil } -func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig) (*types.ExecutorTask, error) { - executor, err := s.chooseExecutor(ctx) - if err != nil { - return nil, err - } - if executor == nil { - return nil, errors.Errorf("cannot choose an executor") - } - +func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { rct := rc.Tasks[rt.ID] environment := map[string]string{} @@ -238,7 +263,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types et.Workspace = ws - return et, nil + return et } func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { @@ -329,15 +354,76 @@ func (s *Scheduler) compactChangeGroups(ctx context.Context) error { return nil } -func (s *Scheduler) advanceRun(ctx context.Context, runID string) error { - r, _, err := store.GetRun(ctx, s.e, runID) - if err != nil { - return errors.Wrapf(err, "cannot get run %q from etcd", runID) +func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error { + log.Debugf("r: %s", util.Dump(r)) + + prevPhase := r.Phase + prevResult := r.Result + + if err := s.advanceRun(ctx, r, rc); err != nil { + return err } + + var runEvent *common.RunEvent + // detect changes to phase and result and set related events + if prevPhase != r.Phase || prevResult != r.Result { + var err error + runEvent, err = common.NewRunEvent(ctx, s.e, r.ID, r.Phase, r.Result) + if err != nil { + return err + } + } + + r, err := store.AtomicPutRun(ctx, s.e, r, runEvent, nil) + if err != nil { + return err + } + + if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { + if err := s.advanceRunTasks(ctx, r, rc); err != nil { + return err + } + r, err := store.AtomicPutRun(ctx, s.e, r, nil, nil) + if err != nil { + return err + } + + tasksToRun, err := s.getTasksToRun(ctx, r) + if err != nil { + return err + } + + return s.submitRunTasks(ctx, r, rc, tasksToRun) + } + + return nil +} + +// advanceRun updates the run result and phase. It must be the unique function that +// should update them. +func (s *Scheduler) advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error { log.Debugf("run: %s", util.Dump(r)) - switch { - case !r.Result.IsSet() && r.Phase == types.RunPhaseRunning: + // fail run if a task is failed + if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { + for _, rt := range r.RunTasks { + rct, ok := rc.Tasks[rt.ID] + log.Debugf("rct: %s", util.Dump(rct)) + if !ok { + return errors.Errorf("no such run config task with id %s for run config %s", rt.ID, rc.ID) + } + if rt.Status == types.RunTaskStatusFailed { + if !rct.IgnoreFailure { + log.Debugf("marking run %q as failed is task %q is failed", r.ID, rt.ID) + r.Result = types.RunResultFailed + break + } + } + } + } + + // see if run could me marked as success + if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { finished := true for _, rt := range r.RunTasks { if !rt.Status.IsFinished() { @@ -346,26 +432,19 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error { } if finished { r.Result = types.RunResultSuccess - - if _, err := store.AtomicPutRun(ctx, s.e, r, common.RunEventTypeSuccess, nil); err != nil { - return err - } return nil } + } + // if run is set to stop set result as stopped + if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning { if r.Stop { r.Result = types.RunResultStopped } - - if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { - return err - } - if err := s.advanceRunTasks(ctx, r); err != nil { - return err - } + } // if the run has a result defined then we can stop current tasks - case r.Result.IsSet(): + if r.Result.IsSet() { if !r.Phase.IsFinished() { hasRunningTasks, err := s.runHasActiveTasks(ctx, r.ID) if err != nil { @@ -376,9 +455,6 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error { if !hasRunningTasks { r.ChangePhase(types.RunPhaseFinished) } - if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { - return err - } } // if the run is finished AND there're no executor tasks scheduled we can mark @@ -396,38 +472,40 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error { } } } - if _, err := store.AtomicPutRun(ctx, s.e, r, common.RunEventTypeRunning, nil); err != nil { - return err - } } } return nil } -func (s *Scheduler) updateRunStatus(ctx context.Context, et *types.ExecutorTask) error { - log.Debugf("et: %s", util.Dump(et)) +func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { r, _, err := store.GetRun(ctx, s.e, et.RunID) if err != nil { return err } - log.Debugf("run: %s", util.Dump(r)) - rc, err := store.LTSGetRunConfig(s.wal, r.ID) if err != nil { return errors.Wrapf(err, "cannot get run config %q", r.ID) } - log.Debugf("rc: %s", util.Dump(rc)) + + if err := s.updateRunTaskStatus(ctx, et, r); err != nil { + return err + } + r, err = store.AtomicPutRun(ctx, s.e, r, nil, nil) + if err != nil { + return err + } + + return s.scheduleRun(ctx, r, rc) +} + +func (s *Scheduler) updateRunTaskStatus(ctx context.Context, et *types.ExecutorTask, r *types.Run) error { + log.Debugf("et: %s", util.Dump(et)) rt, ok := r.RunTasks[et.ID] if !ok { return errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID) } - rct, ok := rc.Tasks[rt.ID] - log.Debugf("rct: %s", util.Dump(rct)) - if !ok { - return errors.Errorf("no such run config task with id %s for run config %s", rt.ID, rc.ID) - } rt.StartTime = et.Status.StartTime rt.EndTime = et.Status.EndTime @@ -495,39 +573,17 @@ func (s *Scheduler) updateRunStatus(ctx context.Context, et *types.ExecutorTask) rt.Steps[i].EndTime = s.EndTime } - if rt.Status == types.RunTaskStatusFailed { - if !rct.IgnoreFailure { - s.failRun(r) - } - } - - var runEventType common.RunEventType - if r.Phase.IsFinished() { - switch r.Result { - case types.RunResultFailed: - runEventType = common.RunEventTypeFailed - } - } - - if _, err := store.AtomicPutRun(ctx, s.e, r, runEventType, nil); err != nil { - return err - } - - return s.advanceRun(ctx, r.ID) + return nil } -func (s *Scheduler) failRun(r *types.Run) { - r.Result = types.RunResultFailed -} - -func (s *Scheduler) runScheduler(ctx context.Context, c <-chan *types.ExecutorTask) { +func (s *Scheduler) executorTaskUpdateHandler(ctx context.Context, c <-chan *types.ExecutorTask) { for { select { case <-ctx.Done(): return case et := <-c: go func() { - if err := s.updateRunStatus(ctx, et); err != nil { + if err := s.handleExecutorTaskUpdate(ctx, et); err != nil { // TODO(sgotti) improve logging to not return "run modified errors" since // they are normal log.Warnf("err: %+v", err) @@ -666,7 +722,7 @@ func (s *Scheduler) runTasksUpdater(ctx context.Context) error { return err } et.Revision = kv.ModRevision - if err := s.updateRunStatus(ctx, et); err != nil { + if err := s.handleExecutorTaskUpdate(ctx, et); err != nil { log.Errorf("err: %v", err) } } @@ -750,7 +806,7 @@ func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID st } rt.SetupStep.LogPhase = types.RunTaskFetchPhaseFinished - if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { + if _, err := store.AtomicPutRun(ctx, s.e, r, nil, nil); err != nil { return err } return nil @@ -770,7 +826,7 @@ func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID str } rt.Steps[stepnum].LogPhase = types.RunTaskFetchPhaseFinished - if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { + if _, err := store.AtomicPutRun(ctx, s.e, r, nil, nil); err != nil { return err } return nil @@ -800,7 +856,7 @@ func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID str return errors.Errorf("no workspace archive for task %s, step %d in run %s", runTaskID, stepnum, runID) } - if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { + if _, err := store.AtomicPutRun(ctx, s.e, r, nil, nil); err != nil { return err } return nil @@ -941,11 +997,11 @@ func (s *Scheduler) fetcher(ctx context.Context) error { } -func (s *Scheduler) runUpdaterLoop(ctx context.Context) { +func (s *Scheduler) runsSchedulerLoop(ctx context.Context) { for { - log.Debugf("runUpdater") + log.Debugf("runsSchedulerLoop") - if err := s.runUpdater(ctx); err != nil { + if err := s.runsScheduler(ctx); err != nil { log.Errorf("err: %+v", err) } @@ -959,25 +1015,34 @@ func (s *Scheduler) runUpdaterLoop(ctx context.Context) { } } -func (s *Scheduler) runUpdater(ctx context.Context) error { - log.Debugf("runUpdater") +func (s *Scheduler) runsScheduler(ctx context.Context) error { + log.Debugf("runsScheduler") runs, err := store.GetRuns(ctx, s.e) if err != nil { return err } for _, r := range runs { - if err := s.advanceRun(ctx, r.ID); err != nil { + if err := s.runScheduler(ctx, r); err != nil { log.Errorf("err: %+v", err) - continue } } return nil } +func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error { + log.Debugf("runScheduler") + rc, err := store.LTSGetRunConfig(s.wal, r.ID) + if err != nil { + return errors.Wrapf(err, "cannot get run config %q", r.ID) + } + + return s.scheduleRun(ctx, r, rc) +} + func (s *Scheduler) finishedRunsArchiverLoop(ctx context.Context) { for { - log.Debugf("finished run archiver") + log.Debugf("finished run archiver loop") if err := s.finishedRunsArchiver(ctx); err != nil { log.Errorf("err: %+v", err) @@ -1059,7 +1124,7 @@ func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error } r.Archived = true - if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { + if _, err := store.AtomicPutRun(ctx, s.e, r, nil, nil); err != nil { return err } @@ -1426,7 +1491,7 @@ func (s *Scheduler) Run(ctx context.Context) error { mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) go s.executorTasksCleanerLoop(ctx) - go s.runUpdaterLoop(ctx) + go s.runsSchedulerLoop(ctx) go s.runTasksUpdaterLoop(ctx) go s.fetcherLoop(ctx) go s.finishedRunsArchiverLoop(ctx) @@ -1434,7 +1499,7 @@ func (s *Scheduler) Run(ctx context.Context) error { go s.dumpLTSCleanerLoop(ctx) go s.compactChangeGroupsLoop(ctx) - go s.runScheduler(ctx, ch) + go s.executorTaskUpdateHandler(ctx, ch) var tlsConfig *tls.Config if s.c.Web.TLS { diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index add2a6c..30c2a56 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -385,7 +385,7 @@ func GetRun(ctx context.Context, e *etcd.Store, runID string) (*types.Run, int64 return r, resp.Header.Revision, nil } -func AtomicPutRun(ctx context.Context, e *etcd.Store, r *types.Run, runEventType common.RunEventType, cgt *types.ChangeGroupsUpdateToken) (*types.Run, error) { +func AtomicPutRun(ctx context.Context, e *etcd.Store, r *types.Run, runEvent *common.RunEvent, cgt *types.ChangeGroupsUpdateToken) (*types.Run, error) { // insert only if the run as changed curRun, _, err := GetRun(ctx, e, r.ID) if err != nil && err != etcd.ErrKeyNotFound { @@ -438,11 +438,7 @@ func AtomicPutRun(ctx context.Context, e *etcd.Store, r *types.Run, runEventType } } - if runEventType != "" { - runEvent, err := common.NewRunEvent(ctx, e, runEventType, r.ID) - if err != nil { - return nil, err - } + if runEvent != nil { eventj, err := json.Marshal(runEvent) if err != nil { return nil, err