runservice: refactor scheduling logic

* split functions in sub parts to ease future testing
* save run fewer times
* rework events logic to considere both run phase and result changes (emit an
event on every phase or result change)
This commit is contained in:
Simone Gotti 2019-04-10 14:48:47 +02:00
parent da27348a1d
commit 751361daea
4 changed files with 179 additions and 122 deletions

View File

@ -84,7 +84,7 @@ func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhase
r.Stop = true
}
_, err = store.AtomicPutRun(ctx, s.e, r, "", cgt)
_, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt)
return err
}
@ -112,7 +112,7 @@ func (s *CommandHandler) StopRun(ctx context.Context, req *RunStopRequest) error
r.Stop = true
}
_, err = store.AtomicPutRun(ctx, s.e, r, "", cgt)
_, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt)
return err
}
@ -330,7 +330,11 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg
return err
}
if _, err := store.AtomicPutRun(ctx, s.e, run, common.RunEventTypeQueued, runcgt); err != nil {
runEvent, err := common.NewRunEvent(ctx, s.e, run.ID, run.Phase, run.Result)
if err != nil {
return err
}
if _, err := store.AtomicPutRun(ctx, s.e, run, runEvent, runcgt); err != nil {
return err
}
return nil
@ -432,7 +436,7 @@ func (s *CommandHandler) ApproveRunTask(ctx context.Context, req *RunTaskApprove
task.Approved = true
task.ApprovalAnnotations = req.ApprovalAnnotations
_, err = store.AtomicPutRun(ctx, s.e, r, "", cgt)
_, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt)
return err
}

View File

@ -19,28 +19,20 @@ import (
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/sequence"
)
type RunEventType string
const (
RunEventTypeQueued RunEventType = "queued"
RunEventTypeCancelled RunEventType = "cancelled"
RunEventTypeRunning RunEventType = "running"
RunEventTypeSuccess RunEventType = "success"
RunEventTypeFailed RunEventType = "failed"
"github.com/sorintlab/agola/internal/services/runservice/types"
)
type RunEvent struct {
Sequence string
EventType RunEventType
RunID string
Sequence string
RunID string
Phase types.RunPhase
Result types.RunResult
}
func NewRunEvent(ctx context.Context, e *etcd.Store, runEventType RunEventType, runID string) (*RunEvent, error) {
func NewRunEvent(ctx context.Context, e *etcd.Store, runID string, phase types.RunPhase, result types.RunResult) (*RunEvent, error) {
seq, err := sequence.IncSequence(ctx, e, EtcdRunEventSequenceKey)
if err != nil {
return nil, err
}
return &RunEvent{Sequence: seq.String(), EventType: runEventType, RunID: runID}, nil
return &RunEvent{Sequence: seq.String(), RunID: runID, Phase: phase, Result: result}, nil
}

View File

@ -81,15 +81,10 @@ func (s *Scheduler) runHasActiveTasks(ctx context.Context, runID string) (bool,
return activeTasks, nil
}
func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error {
func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig) error {
log.Debugf("run: %s", util.Dump(r))
rc, err := store.LTSGetRunConfig(s.wal, r.ID)
if err != nil {
return errors.Wrapf(err, "cannot get run config %q", r.ID)
}
log.Debugf("rc: %s", util.Dump(rc))
tasksToRun := []*types.RunTask{}
// get tasks that can be executed
for _, rt := range r.RunTasks {
log.Debugf("rt: %s", util.Dump(rt))
@ -116,6 +111,40 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error {
if rct.NeedsApproval && !rt.WaitingApproval && !rt.Approved {
rt.WaitingApproval = true
}
}
}
return nil
}
func (s *Scheduler) getTasksToRun(ctx context.Context, r *types.Run) ([]*types.RunTask, error) {
log.Debugf("run: %s", util.Dump(r))
rc, err := store.LTSGetRunConfig(s.wal, r.ID)
if err != nil {
return nil, errors.Wrapf(err, "cannot get run config %q", r.ID)
}
log.Debugf("rc: %s", util.Dump(rc))
tasksToRun := []*types.RunTask{}
// get tasks that can be executed
for _, rt := range r.RunTasks {
log.Debugf("rt: %s", util.Dump(rt))
if rt.Skip {
continue
}
if rt.Status != types.RunTaskStatusNotStarted {
continue
}
rct := rc.Tasks[rt.ID]
parents := runconfig.GetParents(rc.Tasks, rct)
canRun := true
for _, p := range parents {
rp := r.RunTasks[p.ID]
canRun = rp.Status.IsFinished() && rp.ArchivesFetchFinished()
}
if canRun {
// Run only if approved if needed
if !rct.NeedsApproval || (rct.NeedsApproval && rt.Approved) {
tasksToRun = append(tasksToRun, rt)
@ -123,18 +152,22 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error {
}
}
// save run since we may have changed some run tasks to waiting approval
if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil {
return err
}
return tasksToRun, nil
}
log.Debugf("tasksToRun: %s", util.Dump(tasksToRun))
func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig, tasks []*types.RunTask) error {
log.Debugf("tasksToRun: %s", util.Dump(tasks))
for _, rt := range tasksToRun {
et, err := s.genExecutorTask(ctx, r, rt, rc)
for _, rt := range tasks {
executor, err := s.chooseExecutor(ctx)
if err != nil {
return err
}
if executor == nil {
return errors.Errorf("cannot choose an executor")
}
et := s.genExecutorTask(ctx, r, rt, rc, executor)
log.Debugf("et: %s", util.Dump(et))
// check that the executorTask wasn't already scheduled
@ -173,15 +206,7 @@ func (s *Scheduler) chooseExecutor(ctx context.Context) (*types.Executor, error)
return nil, nil
}
func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig) (*types.ExecutorTask, error) {
executor, err := s.chooseExecutor(ctx)
if err != nil {
return nil, err
}
if executor == nil {
return nil, errors.Errorf("cannot choose an executor")
}
func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask {
rct := rc.Tasks[rt.ID]
environment := map[string]string{}
@ -238,7 +263,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types
et.Workspace = ws
return et, nil
return et
}
func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error {
@ -329,15 +354,76 @@ func (s *Scheduler) compactChangeGroups(ctx context.Context) error {
return nil
}
func (s *Scheduler) advanceRun(ctx context.Context, runID string) error {
r, _, err := store.GetRun(ctx, s.e, runID)
if err != nil {
return errors.Wrapf(err, "cannot get run %q from etcd", runID)
func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error {
log.Debugf("r: %s", util.Dump(r))
prevPhase := r.Phase
prevResult := r.Result
if err := s.advanceRun(ctx, r, rc); err != nil {
return err
}
var runEvent *common.RunEvent
// detect changes to phase and result and set related events
if prevPhase != r.Phase || prevResult != r.Result {
var err error
runEvent, err = common.NewRunEvent(ctx, s.e, r.ID, r.Phase, r.Result)
if err != nil {
return err
}
}
r, err := store.AtomicPutRun(ctx, s.e, r, runEvent, nil)
if err != nil {
return err
}
if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning {
if err := s.advanceRunTasks(ctx, r, rc); err != nil {
return err
}
r, err := store.AtomicPutRun(ctx, s.e, r, nil, nil)
if err != nil {
return err
}
tasksToRun, err := s.getTasksToRun(ctx, r)
if err != nil {
return err
}
return s.submitRunTasks(ctx, r, rc, tasksToRun)
}
return nil
}
// advanceRun updates the run result and phase. It must be the unique function that
// should update them.
func (s *Scheduler) advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error {
log.Debugf("run: %s", util.Dump(r))
switch {
case !r.Result.IsSet() && r.Phase == types.RunPhaseRunning:
// fail run if a task is failed
if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning {
for _, rt := range r.RunTasks {
rct, ok := rc.Tasks[rt.ID]
log.Debugf("rct: %s", util.Dump(rct))
if !ok {
return errors.Errorf("no such run config task with id %s for run config %s", rt.ID, rc.ID)
}
if rt.Status == types.RunTaskStatusFailed {
if !rct.IgnoreFailure {
log.Debugf("marking run %q as failed is task %q is failed", r.ID, rt.ID)
r.Result = types.RunResultFailed
break
}
}
}
}
// see if run could me marked as success
if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning {
finished := true
for _, rt := range r.RunTasks {
if !rt.Status.IsFinished() {
@ -346,26 +432,19 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error {
}
if finished {
r.Result = types.RunResultSuccess
if _, err := store.AtomicPutRun(ctx, s.e, r, common.RunEventTypeSuccess, nil); err != nil {
return err
}
return nil
}
}
// if run is set to stop set result as stopped
if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning {
if r.Stop {
r.Result = types.RunResultStopped
}
if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil {
return err
}
if err := s.advanceRunTasks(ctx, r); err != nil {
return err
}
}
// if the run has a result defined then we can stop current tasks
case r.Result.IsSet():
if r.Result.IsSet() {
if !r.Phase.IsFinished() {
hasRunningTasks, err := s.runHasActiveTasks(ctx, r.ID)
if err != nil {
@ -376,9 +455,6 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error {
if !hasRunningTasks {
r.ChangePhase(types.RunPhaseFinished)
}
if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil {
return err
}
}
// if the run is finished AND there're no executor tasks scheduled we can mark
@ -396,38 +472,40 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error {
}
}
}
if _, err := store.AtomicPutRun(ctx, s.e, r, common.RunEventTypeRunning, nil); err != nil {
return err
}
}
}
return nil
}
func (s *Scheduler) updateRunStatus(ctx context.Context, et *types.ExecutorTask) error {
log.Debugf("et: %s", util.Dump(et))
func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error {
r, _, err := store.GetRun(ctx, s.e, et.RunID)
if err != nil {
return err
}
log.Debugf("run: %s", util.Dump(r))
rc, err := store.LTSGetRunConfig(s.wal, r.ID)
if err != nil {
return errors.Wrapf(err, "cannot get run config %q", r.ID)
}
log.Debugf("rc: %s", util.Dump(rc))
if err := s.updateRunTaskStatus(ctx, et, r); err != nil {
return err
}
r, err = store.AtomicPutRun(ctx, s.e, r, nil, nil)
if err != nil {
return err
}
return s.scheduleRun(ctx, r, rc)
}
func (s *Scheduler) updateRunTaskStatus(ctx context.Context, et *types.ExecutorTask, r *types.Run) error {
log.Debugf("et: %s", util.Dump(et))
rt, ok := r.RunTasks[et.ID]
if !ok {
return errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID)
}
rct, ok := rc.Tasks[rt.ID]
log.Debugf("rct: %s", util.Dump(rct))
if !ok {
return errors.Errorf("no such run config task with id %s for run config %s", rt.ID, rc.ID)
}
rt.StartTime = et.Status.StartTime
rt.EndTime = et.Status.EndTime
@ -495,39 +573,17 @@ func (s *Scheduler) updateRunStatus(ctx context.Context, et *types.ExecutorTask)
rt.Steps[i].EndTime = s.EndTime
}
if rt.Status == types.RunTaskStatusFailed {
if !rct.IgnoreFailure {
s.failRun(r)
}
}
var runEventType common.RunEventType
if r.Phase.IsFinished() {
switch r.Result {
case types.RunResultFailed:
runEventType = common.RunEventTypeFailed
}
}
if _, err := store.AtomicPutRun(ctx, s.e, r, runEventType, nil); err != nil {
return err
}
return s.advanceRun(ctx, r.ID)
return nil
}
func (s *Scheduler) failRun(r *types.Run) {
r.Result = types.RunResultFailed
}
func (s *Scheduler) runScheduler(ctx context.Context, c <-chan *types.ExecutorTask) {
func (s *Scheduler) executorTaskUpdateHandler(ctx context.Context, c <-chan *types.ExecutorTask) {
for {
select {
case <-ctx.Done():
return
case et := <-c:
go func() {
if err := s.updateRunStatus(ctx, et); err != nil {
if err := s.handleExecutorTaskUpdate(ctx, et); err != nil {
// TODO(sgotti) improve logging to not return "run modified errors" since
// they are normal
log.Warnf("err: %+v", err)
@ -666,7 +722,7 @@ func (s *Scheduler) runTasksUpdater(ctx context.Context) error {
return err
}
et.Revision = kv.ModRevision
if err := s.updateRunStatus(ctx, et); err != nil {
if err := s.handleExecutorTaskUpdate(ctx, et); err != nil {
log.Errorf("err: %v", err)
}
}
@ -750,7 +806,7 @@ func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID st
}
rt.SetupStep.LogPhase = types.RunTaskFetchPhaseFinished
if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil {
if _, err := store.AtomicPutRun(ctx, s.e, r, nil, nil); err != nil {
return err
}
return nil
@ -770,7 +826,7 @@ func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID str
}
rt.Steps[stepnum].LogPhase = types.RunTaskFetchPhaseFinished
if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil {
if _, err := store.AtomicPutRun(ctx, s.e, r, nil, nil); err != nil {
return err
}
return nil
@ -800,7 +856,7 @@ func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID str
return errors.Errorf("no workspace archive for task %s, step %d in run %s", runTaskID, stepnum, runID)
}
if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil {
if _, err := store.AtomicPutRun(ctx, s.e, r, nil, nil); err != nil {
return err
}
return nil
@ -941,11 +997,11 @@ func (s *Scheduler) fetcher(ctx context.Context) error {
}
func (s *Scheduler) runUpdaterLoop(ctx context.Context) {
func (s *Scheduler) runsSchedulerLoop(ctx context.Context) {
for {
log.Debugf("runUpdater")
log.Debugf("runsSchedulerLoop")
if err := s.runUpdater(ctx); err != nil {
if err := s.runsScheduler(ctx); err != nil {
log.Errorf("err: %+v", err)
}
@ -959,25 +1015,34 @@ func (s *Scheduler) runUpdaterLoop(ctx context.Context) {
}
}
func (s *Scheduler) runUpdater(ctx context.Context) error {
log.Debugf("runUpdater")
func (s *Scheduler) runsScheduler(ctx context.Context) error {
log.Debugf("runsScheduler")
runs, err := store.GetRuns(ctx, s.e)
if err != nil {
return err
}
for _, r := range runs {
if err := s.advanceRun(ctx, r.ID); err != nil {
if err := s.runScheduler(ctx, r); err != nil {
log.Errorf("err: %+v", err)
continue
}
}
return nil
}
func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error {
log.Debugf("runScheduler")
rc, err := store.LTSGetRunConfig(s.wal, r.ID)
if err != nil {
return errors.Wrapf(err, "cannot get run config %q", r.ID)
}
return s.scheduleRun(ctx, r, rc)
}
func (s *Scheduler) finishedRunsArchiverLoop(ctx context.Context) {
for {
log.Debugf("finished run archiver")
log.Debugf("finished run archiver loop")
if err := s.finishedRunsArchiver(ctx); err != nil {
log.Errorf("err: %+v", err)
@ -1059,7 +1124,7 @@ func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error
}
r.Archived = true
if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil {
if _, err := store.AtomicPutRun(ctx, s.e, r, nil, nil); err != nil {
return err
}
@ -1426,7 +1491,7 @@ func (s *Scheduler) Run(ctx context.Context) error {
mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
go s.executorTasksCleanerLoop(ctx)
go s.runUpdaterLoop(ctx)
go s.runsSchedulerLoop(ctx)
go s.runTasksUpdaterLoop(ctx)
go s.fetcherLoop(ctx)
go s.finishedRunsArchiverLoop(ctx)
@ -1434,7 +1499,7 @@ func (s *Scheduler) Run(ctx context.Context) error {
go s.dumpLTSCleanerLoop(ctx)
go s.compactChangeGroupsLoop(ctx)
go s.runScheduler(ctx, ch)
go s.executorTaskUpdateHandler(ctx, ch)
var tlsConfig *tls.Config
if s.c.Web.TLS {

View File

@ -385,7 +385,7 @@ func GetRun(ctx context.Context, e *etcd.Store, runID string) (*types.Run, int64
return r, resp.Header.Revision, nil
}
func AtomicPutRun(ctx context.Context, e *etcd.Store, r *types.Run, runEventType common.RunEventType, cgt *types.ChangeGroupsUpdateToken) (*types.Run, error) {
func AtomicPutRun(ctx context.Context, e *etcd.Store, r *types.Run, runEvent *common.RunEvent, cgt *types.ChangeGroupsUpdateToken) (*types.Run, error) {
// insert only if the run as changed
curRun, _, err := GetRun(ctx, e, r.ID)
if err != nil && err != etcd.ErrKeyNotFound {
@ -438,11 +438,7 @@ func AtomicPutRun(ctx context.Context, e *etcd.Store, r *types.Run, runEventType
}
}
if runEventType != "" {
runEvent, err := common.NewRunEvent(ctx, e, runEventType, r.ID)
if err != nil {
return nil, err
}
if runEvent != nil {
eventj, err := json.Marshal(runEvent)
if err != nil {
return nil, err