diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 6777e3d..9bbcd6e 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -414,7 +414,15 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type RunCreateRequest struct { - RunConfig *types.RunConfig `json:"run_config"` + // new run + RunConfig *types.RunConfig `json:"run_config"` + + // existing run + RunID string `json:"run_id"` + RunConfigID string `json:"run_config_id"` + FromStart bool `json:"from_start"` + ResetTasks []string `json:"reset_tasks"` + Group string `json:"group"` Environment map[string]string `json:"environment"` Annotations map[string]string `json:"annotations"` @@ -445,15 +453,31 @@ func (h *RunCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { creq := &command.RunCreateRequest{ RunConfig: req.RunConfig, + RunID: req.RunID, + RunConfigID: req.RunConfigID, + FromStart: req.FromStart, + ResetTasks: req.ResetTasks, Group: req.Group, Environment: req.Environment, Annotations: req.Annotations, ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, } - if err := h.ch.CreateRun(ctx, creq); err != nil { + rb, err := h.ch.CreateRun(ctx, creq) + if err != nil { + h.log.Errorf("err: %+v", err) http.Error(w, err.Error(), http.StatusBadRequest) return } + + res := &RunResponse{ + Run: rb.Run, + RunConfig: rb.Rc, + } + + if err := json.NewEncoder(w).Encode(res); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } type RunActionType string diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/command/command.go index cdd0371..6cbf012 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/command/command.go @@ -18,6 +18,7 @@ import ( "context" "time" + uuid "github.com/satori/go.uuid" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/runconfig" @@ -79,38 +80,55 @@ func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhase type RunCreateRequest struct { RunConfig *types.RunConfig + RunID string + RunConfigID string + FromStart bool + ResetTasks []string Group string Environment map[string]string Annotations map[string]string ChangeGroupsUpdateToken string } -func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) error { +func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { runcgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken) if err != nil { - return err + return nil, err } + var rb *types.RunBundle + if req.RunID == "" { + rb, err = s.newRun(ctx, req) + } else { + rb, err = s.recreateRun(ctx, req) + } + if err != nil { + return nil, err + } + + return rb, s.saveRun(ctx, rb, runcgt) +} + +func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { rc := req.RunConfig + var run *types.Run // generate a new run sequence that will be the same for the run, runconfig and rundata seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey) if err != nil { - return err + return nil, err } id := seq.String() - // TODO(sgotti) validate run config if err := runconfig.CheckRunConfig(rc); err != nil { - return err + return nil, err } - // set the run config ID rc.ID = id // generate tasks levels if err := runconfig.GenTasksLevels(rc); err != nil { - return err + return nil, err } rd := &types.RunData{ @@ -120,12 +138,110 @@ func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) e Annotations: req.Annotations, } - run, err := s.genRun(ctx, rc, rd) + run, err = s.genRun(ctx, rc, rd) if err != nil { - return err + return nil, err } s.log.Debugf("created run: %s", util.Dump(run)) + return &types.RunBundle{ + Run: run, + Rc: rc, + Rd: rd, + }, nil +} + +func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { + // generate a new run sequence that will be the same for the run, runconfig and rundata + seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey) + if err != nil { + return nil, err + } + id := seq.String() + + // fetch the existing runconfig, rundata and run + s.log.Infof("creating run from existing run") + rc, err := store.LTSGetRunConfig(s.wal, req.RunID) + if err != nil { + return nil, errors.Wrapf(err, "runconfig %q doens't exist", req.RunID) + } + // update the run config ID + rc.ID = id + + rd, err := store.LTSGetRunData(s.wal, req.RunID) + if err != nil { + return nil, errors.Wrapf(err, "rundata %q doens't exist", req.RunID) + } + // update the run data ID + rd.ID = id + + run, err := store.GetRunEtcdOrLTS(ctx, s.e, s.wal, req.RunID) + if err != nil { + return nil, err + } + if run == nil { + return nil, errors.Wrapf(err, "run %q doens't exist", req.RunID) + } + + // update the run ID + run.ID = id + // reset run revision + run.Revision = 0 + // reset phase/result/archived/stop + run.Phase = types.RunPhaseQueued + run.Result = types.RunResultUnknown + run.Archived = false + + // TODO(sgotti) handle reset tasks + // currently we only restart a run resetting al failed tasks + tasksToAdd := []*types.RunTask{} + tasksToDelete := []string{} + + s.log.Infof("fromStart: %t", req.FromStart) + for _, rt := range run.RunTasks { + if req.FromStart || rt.Status != types.RunTaskStatusSuccess { + rct := rc.Tasks[rt.ID] + // change rct id + rct.ID = uuid.NewV4().String() + + // update runconfig + delete(rc.Tasks, rt.ID) + rc.Tasks[rct.ID] = rct + // update other tasks depends to new task id + for _, t := range rc.Tasks { + for _, d := range t.Depends { + if d.TaskID == rt.ID { + d.TaskID = rct.ID + } + } + } + + nrt := s.genRunTask(ctx, rct) + tasksToAdd = append(tasksToAdd, nrt) + tasksToDelete = append(tasksToDelete, rt.ID) + } + } + for _, rt := range tasksToAdd { + run.RunTasks[rt.ID] = rt + } + for _, rtID := range tasksToDelete { + delete(run.RunTasks, rtID) + } + + s.log.Debugf("created run from existing run: %s", util.Dump(run)) + + return &types.RunBundle{ + Run: run, + Rc: rc, + Rd: rd, + }, nil +} + +func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcgt *types.ChangeGroupsUpdateToken) error { + run := rb.Run + rc := rb.Rc + rd := rb.Rd + c, cgt, err := store.LTSGetRunCounter(s.wal, run.Group) s.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt)) if err != nil && err != objectstorage.ErrNotExist { diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index b348027..55e9236 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -22,6 +22,12 @@ import ( "github.com/sorintlab/agola/internal/util" ) +type RunBundle struct { + Run *Run + Rc *RunConfig + Rd *RunData +} + type SortOrder int const (