diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/action/action.go similarity index 80% rename from internal/services/runservice/scheduler/command/command.go rename to internal/services/runservice/scheduler/action/action.go index e8fe910..f4cf907 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/action/action.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package command +package action import ( "context" @@ -36,7 +36,7 @@ import ( "go.uber.org/zap" ) -type CommandHandler struct { +type ActionHandler struct { log *zap.SugaredLogger e *etcd.Store readDB *readdb.ReadDB @@ -44,8 +44,8 @@ type CommandHandler struct { dm *datamanager.DataManager } -func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *CommandHandler { - return &CommandHandler{ +func NewActionHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *ActionHandler { + return &ActionHandler{ log: logger.Sugar(), e: e, readDB: readDB, @@ -60,13 +60,13 @@ type RunChangePhaseRequest struct { ChangeGroupsUpdateToken string } -func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhaseRequest) error { +func (h *ActionHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhaseRequest) error { cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken) if err != nil { return err } - r, _, err := store.GetRun(ctx, s.e, req.RunID) + r, _, err := store.GetRun(ctx, h.e, req.RunID) if err != nil { return err } @@ -84,7 +84,7 @@ func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhase r.Stop = true } - _, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt) + _, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt) return err } @@ -93,13 +93,13 @@ type RunStopRequest struct { ChangeGroupsUpdateToken string } -func (s *CommandHandler) StopRun(ctx context.Context, req *RunStopRequest) error { +func (h *ActionHandler) StopRun(ctx context.Context, req *RunStopRequest) error { cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken) if err != nil { return err } - r, _, err := store.GetRun(ctx, s.e, req.RunID) + r, _, err := store.GetRun(ctx, h.e, req.RunID) if err != nil { return err } @@ -112,7 +112,7 @@ func (s *CommandHandler) StopRun(ctx context.Context, req *RunStopRequest) error r.Stop = true } - _, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt) + _, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt) return err } @@ -135,7 +135,7 @@ type RunCreateRequest struct { ChangeGroupsUpdateToken string } -func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { +func (h *ActionHandler) CreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { runcgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken) if err != nil { return nil, err @@ -143,18 +143,18 @@ func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) ( var rb *types.RunBundle if req.RunID == "" { - rb, err = s.newRun(ctx, req) + rb, err = h.newRun(ctx, req) } else { - rb, err = s.recreateRun(ctx, req) + rb, err = h.recreateRun(ctx, req) } if err != nil { return nil, err } - return rb, s.saveRun(ctx, rb, runcgt) + return rb, h.saveRun(ctx, rb, runcgt) } -func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { +func (h *ActionHandler) newRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { rcts := req.RunConfigTasks setupErrors := req.SetupErrors @@ -169,21 +169,21 @@ func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*ty } // generate a new run sequence that will be the same for the run and runconfig - seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey) + seq, err := sequence.IncSequence(ctx, h.e, common.EtcdRunSequenceKey) if err != nil { return nil, err } id := seq.String() if err := runconfig.CheckRunConfigTasks(rcts); err != nil { - s.log.Errorf("check run config tasks failed: %+v", err) + h.log.Errorf("check run config tasks failed: %+v", err) setupErrors = append(setupErrors, err.Error()) } // generate tasks levels if len(setupErrors) == 0 { if err := runconfig.GenTasksLevels(rcts); err != nil { - s.log.Errorf("gen tasks leveles failed: %+v", err) + h.log.Errorf("gen tasks leveles failed: %+v", err) setupErrors = append(setupErrors, err.Error()) } } @@ -200,7 +200,7 @@ func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*ty } run := genRun(rc) - s.log.Debugf("created run: %s", util.Dump(run)) + h.log.Debugf("created run: %s", util.Dump(run)) return &types.RunBundle{ Run: run, @@ -208,22 +208,22 @@ func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*ty }, nil } -func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { +func (h *ActionHandler) recreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) { // generate a new run sequence that will be the same for the run and runconfig - seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey) + seq, err := sequence.IncSequence(ctx, h.e, common.EtcdRunSequenceKey) if err != nil { return nil, err } id := seq.String() // fetch the existing runconfig and run - s.log.Infof("creating run from existing run") - rc, err := store.OSTGetRunConfig(s.dm, req.RunID) + h.log.Infof("creating run from existing run") + rc, err := store.OSTGetRunConfig(h.dm, req.RunID) if err != nil { return nil, util.NewErrBadRequest(errors.Wrapf(err, "runconfig %q doens't exist", req.RunID)) } - run, err := store.GetRunEtcdOrOST(ctx, s.e, s.dm, req.RunID) + run, err := store.GetRunEtcdOrOST(ctx, h.e, h.dm, req.RunID) if err != nil { return nil, err } @@ -231,8 +231,8 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) return nil, util.NewErrBadRequest(errors.Wrapf(err, "run %q doens't exist", req.RunID)) } - s.log.Infof("rc: %s", util.Dump(rc)) - s.log.Infof("run: %s", util.Dump(run)) + h.log.Infof("rc: %s", util.Dump(rc)) + h.log.Infof("run: %s", util.Dump(run)) if req.FromStart { if canRestart, reason := run.CanRestartFromScratch(); !canRestart { @@ -246,8 +246,8 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) rb := recreateRun(util.DefaultUUIDGenerator{}, run, rc, id, req) - s.log.Infof("created rc from existing rc: %s", util.Dump(rb.Rc)) - s.log.Infof("created run from existing run: %s", util.Dump(rb.Run)) + h.log.Infof("created rc from existing rc: %s", util.Dump(rb.Rc)) + h.log.Infof("created run from existing run: %s", util.Dump(rb.Run)) return rb, nil } @@ -365,12 +365,12 @@ func recreateRun(uuid util.UUIDGenerator, run *types.Run, rc *types.RunConfig, n } } -func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcgt *types.ChangeGroupsUpdateToken) error { +func (h *ActionHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcgt *types.ChangeGroupsUpdateToken) error { run := rb.Run rc := rb.Rc - c, cgt, err := s.getRunCounter(run.Group) - s.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt)) + c, cgt, err := h.getRunCounter(run.Group) + h.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt)) if err != nil { return err } @@ -395,15 +395,15 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg } actions = append(actions, rca) - if _, err = s.dm.WriteWal(ctx, actions, cgt); err != nil { + if _, err = h.dm.WriteWal(ctx, actions, cgt); err != nil { return err } - runEvent, err := common.NewRunEvent(ctx, s.e, run.ID, run.Phase, run.Result) + runEvent, err := common.NewRunEvent(ctx, h.e, run.ID, run.Phase, run.Result) if err != nil { return err } - if _, err := store.AtomicPutRun(ctx, s.e, run, runEvent, runcgt); err != nil { + if _, err := store.AtomicPutRun(ctx, h.e, run, runEvent, runcgt); err != nil { return err } return nil @@ -476,13 +476,13 @@ type RunTaskApproveRequest struct { ChangeGroupsUpdateToken string } -func (s *CommandHandler) ApproveRunTask(ctx context.Context, req *RunTaskApproveRequest) error { +func (h *ActionHandler) ApproveRunTask(ctx context.Context, req *RunTaskApproveRequest) error { cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken) if err != nil { return err } - r, _, err := store.GetRun(ctx, s.e, req.RunID) + r, _, err := store.GetRun(ctx, h.e, req.RunID) if err != nil { return err } @@ -504,13 +504,13 @@ func (s *CommandHandler) ApproveRunTask(ctx context.Context, req *RunTaskApprove task.Approved = true task.ApprovalAnnotations = req.ApprovalAnnotations - _, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt) + _, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt) return err } -func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string) error { +func (h *ActionHandler) DeleteExecutor(ctx context.Context, executorID string) error { // mark all executor tasks as failed - ets, err := store.GetExecutorTasks(ctx, s.e, executorID) + ets, err := store.GetExecutorTasks(ctx, h.e, executorID) if err != nil { return err } @@ -518,20 +518,20 @@ func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string) for _, et := range ets { et.Status.Phase = types.ExecutorTaskPhaseFailed et.FailError = "executor deleted" - if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { + if _, err := store.AtomicPutExecutorTask(ctx, h.e, et); err != nil { return err } } // delete the executor - if err := store.DeleteExecutor(ctx, s.e, executorID); err != nil { + if err := store.DeleteExecutor(ctx, h.e, executorID); err != nil { return err } return nil } -func (s *CommandHandler) getRunCounter(group string) (uint64, *datamanager.ChangeGroupsUpdateToken, error) { +func (h *ActionHandler) getRunCounter(group string) (uint64, *datamanager.ChangeGroupsUpdateToken, error) { // use the first group dir after the root pl := util.PathList(group) if len(pl) < 2 { @@ -540,13 +540,13 @@ func (s *CommandHandler) getRunCounter(group string) (uint64, *datamanager.Chang var c uint64 var cgt *datamanager.ChangeGroupsUpdateToken - err := s.readDB.Do(func(tx *db.Tx) error { + err := h.readDB.Do(func(tx *db.Tx) error { var err error - c, err = s.readDB.GetRunCounterOST(tx, pl[1]) + c, err = h.readDB.GetRunCounterOST(tx, pl[1]) if err != nil { return err } - cgt, err = s.readDB.GetChangeGroupsUpdateTokensOST(tx, []string{"counter-" + pl[1]}) + cgt, err = h.readDB.GetChangeGroupsUpdateTokensOST(tx, []string{"counter-" + pl[1]}) return err }) if err != nil { diff --git a/internal/services/runservice/scheduler/command/command_test.go b/internal/services/runservice/scheduler/action/action_test.go similarity index 99% rename from internal/services/runservice/scheduler/command/command_test.go rename to internal/services/runservice/scheduler/action/action_test.go index 9a7d685..585a424 100644 --- a/internal/services/runservice/scheduler/command/command_test.go +++ b/internal/services/runservice/scheduler/action/action_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package command +package action import ( "testing" @@ -167,7 +167,7 @@ func TestRecreateRun(t *testing.T) { // initial run that matched the runconfig, all tasks are not started or skipped // (if the runconfig task as Skip == true). This must match the status - // generated by command.genRun() + // generated by action.genRun() run := genRun(rc) outrun := genRun(outrc) diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index f124b65..61fe91c 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -27,7 +27,7 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/command" + "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" @@ -501,13 +501,13 @@ type RunCreateRequest struct { type RunCreateHandler struct { log *zap.SugaredLogger - ch *command.CommandHandler + ah *action.ActionHandler } -func NewRunCreateHandler(logger *zap.Logger, ch *command.CommandHandler) *RunCreateHandler { +func NewRunCreateHandler(logger *zap.Logger, ah *action.ActionHandler) *RunCreateHandler { return &RunCreateHandler{ log: logger.Sugar(), - ch: ch, + ah: ah, } } @@ -521,7 +521,7 @@ func (h *RunCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - creq := &command.RunCreateRequest{ + creq := &action.RunCreateRequest{ RunConfigTasks: req.RunConfigTasks, Name: req.Name, Group: req.Group, @@ -536,7 +536,7 @@ func (h *RunCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Annotations: req.Annotations, ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, } - rb, err := h.ch.CreateRun(ctx, creq) + rb, err := h.ah.CreateRun(ctx, creq) if err != nil { h.log.Errorf("err: %+v", err) httpError(w, err) @@ -569,14 +569,14 @@ type RunActionsRequest struct { type RunActionsHandler struct { log *zap.SugaredLogger - ch *command.CommandHandler + ah *action.ActionHandler readDB *readdb.ReadDB } -func NewRunActionsHandler(logger *zap.Logger, ch *command.CommandHandler) *RunActionsHandler { +func NewRunActionsHandler(logger *zap.Logger, ah *action.ActionHandler) *RunActionsHandler { return &RunActionsHandler{ log: logger.Sugar(), - ch: ch, + ah: ah, } } @@ -594,22 +594,22 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch req.ActionType { case RunActionTypeChangePhase: - creq := &command.RunChangePhaseRequest{ + creq := &action.RunChangePhaseRequest{ RunID: runID, Phase: req.Phase, ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, } - if err := h.ch.ChangeRunPhase(ctx, creq); err != nil { + if err := h.ah.ChangeRunPhase(ctx, creq); err != nil { h.log.Errorf("err: %+v", err) httpError(w, err) return } case RunActionTypeStop: - creq := &command.RunStopRequest{ + creq := &action.RunStopRequest{ RunID: runID, ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, } - if err := h.ch.StopRun(ctx, creq); err != nil { + if err := h.ah.StopRun(ctx, creq); err != nil { h.log.Errorf("err: %+v", err) httpError(w, err) return @@ -634,14 +634,14 @@ type RunTaskActionsRequest struct { type RunTaskActionsHandler struct { log *zap.SugaredLogger - ch *command.CommandHandler + ah *action.ActionHandler readDB *readdb.ReadDB } -func NewRunTaskActionsHandler(logger *zap.Logger, ch *command.CommandHandler) *RunTaskActionsHandler { +func NewRunTaskActionsHandler(logger *zap.Logger, ah *action.ActionHandler) *RunTaskActionsHandler { return &RunTaskActionsHandler{ log: logger.Sugar(), - ch: ch, + ah: ah, } } @@ -660,12 +660,12 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request switch req.ActionType { case RunTaskActionTypeApprove: - creq := &command.RunTaskApproveRequest{ + creq := &action.RunTaskApproveRequest{ RunID: runID, TaskID: taskID, ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, } - if err := h.ch.ApproveRunTask(ctx, creq); err != nil { + if err := h.ah.ApproveRunTask(ctx, creq); err != nil { h.log.Errorf("err: %+v", err) httpError(w, err) return diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/scheduler/api/executor.go index e4cb9b7..8cecdec 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/scheduler/api/executor.go @@ -26,7 +26,7 @@ import ( "github.com/gorilla/mux" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/command" + "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/types" @@ -36,11 +36,11 @@ import ( type ExecutorStatusHandler struct { log *zap.SugaredLogger e *etcd.Store - ch *command.CommandHandler + ah *action.ActionHandler } -func NewExecutorStatusHandler(logger *zap.Logger, e *etcd.Store, ch *command.CommandHandler) *ExecutorStatusHandler { - return &ExecutorStatusHandler{log: logger.Sugar(), e: e, ch: ch} +func NewExecutorStatusHandler(logger *zap.Logger, e *etcd.Store, ah *action.ActionHandler) *ExecutorStatusHandler { + return &ExecutorStatusHandler{log: logger.Sugar(), e: e, ah: ah} } func (h *ExecutorStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -92,7 +92,7 @@ func (h *ExecutorStatusHandler) deleteStaleExecutors(ctx context.Context, curExe } } if !active { - if err := h.ch.DeleteExecutor(ctx, executor.ID); err != nil { + if err := h.ah.DeleteExecutor(ctx, executor.ID); err != nil { h.log.Errorf("failed to delete executor %q: %v", executor.ID, err) } } @@ -429,13 +429,13 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { type ExecutorDeleteHandler struct { log *zap.SugaredLogger - ch *command.CommandHandler + ah *action.ActionHandler } -func NewExecutorDeleteHandler(logger *zap.Logger, ch *command.CommandHandler) *ExecutorDeleteHandler { +func NewExecutorDeleteHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorDeleteHandler { return &ExecutorDeleteHandler{ log: logger.Sugar(), - ch: ch, + ah: ah, } } @@ -450,7 +450,7 @@ func (h *ExecutorDeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request return } - if err := h.ch.DeleteExecutor(ctx, executorID); err != nil { + if err := h.ah.DeleteExecutor(ctx, executorID); err != nil { http.Error(w, "", http.StatusInternalServerError) return } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 759d22d..d75abf3 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -34,8 +34,8 @@ import ( "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/runconfig" "github.com/sorintlab/agola/internal/services/config" + "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/command" "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" @@ -1399,7 +1399,7 @@ type Scheduler struct { ost *objectstorage.ObjStorage dm *datamanager.DataManager readDB *readdb.ReadDB - ch *command.CommandHandler + ah *action.ActionHandler } func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Scheduler, error) { @@ -1443,8 +1443,8 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule } s.readDB = readDB - ch := command.NewCommandHandler(logger, e, readDB, ost, dm) - s.ch = ch + ah := action.NewActionHandler(logger, e, readDB, ost, dm) + s.ah = ah return s, nil } @@ -1497,7 +1497,7 @@ func (s *Scheduler) Run(ctx context.Context) error { corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions) // executor dedicated api, only calls from executor should happen on these handlers - executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ch) + executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch) executorTaskHandler := api.NewExecutorTaskHandler(s.e) executorTasksHandler := api.NewExecutorTasksHandler(s.e) @@ -1506,15 +1506,15 @@ func (s *Scheduler) Run(ctx context.Context) error { cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) // api from clients - executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ch) + executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah) logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm) runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB) - runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ch) + runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah) runsHandler := api.NewRunsHandler(logger, s.readDB) - runActionsHandler := api.NewRunActionsHandler(logger, s.ch) - runCreateHandler := api.NewRunCreateHandler(logger, s.ch) + runActionsHandler := api.NewRunActionsHandler(logger, s.ah) + runCreateHandler := api.NewRunCreateHandler(logger, s.ah) changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB) router := mux.NewRouter()