runservice: rename command(handler) to action(handler)

Since we're going to migrate all actions (also queries that now are implemented
in the api handlers) there
This commit is contained in:
Simone Gotti 2019-05-03 23:59:21 +02:00
parent 3f7e554f04
commit cb78ea48bc
5 changed files with 83 additions and 83 deletions

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package command
package action
import (
"context"
@ -36,7 +36,7 @@ import (
"go.uber.org/zap"
)
type CommandHandler struct {
type ActionHandler struct {
log *zap.SugaredLogger
e *etcd.Store
readDB *readdb.ReadDB
@ -44,8 +44,8 @@ type CommandHandler struct {
dm *datamanager.DataManager
}
func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *CommandHandler {
return &CommandHandler{
func NewActionHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *ActionHandler {
return &ActionHandler{
log: logger.Sugar(),
e: e,
readDB: readDB,
@ -60,13 +60,13 @@ type RunChangePhaseRequest struct {
ChangeGroupsUpdateToken string
}
func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhaseRequest) error {
func (h *ActionHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhaseRequest) error {
cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken)
if err != nil {
return err
}
r, _, err := store.GetRun(ctx, s.e, req.RunID)
r, _, err := store.GetRun(ctx, h.e, req.RunID)
if err != nil {
return err
}
@ -84,7 +84,7 @@ func (s *CommandHandler) ChangeRunPhase(ctx context.Context, req *RunChangePhase
r.Stop = true
}
_, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt)
_, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt)
return err
}
@ -93,13 +93,13 @@ type RunStopRequest struct {
ChangeGroupsUpdateToken string
}
func (s *CommandHandler) StopRun(ctx context.Context, req *RunStopRequest) error {
func (h *ActionHandler) StopRun(ctx context.Context, req *RunStopRequest) error {
cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken)
if err != nil {
return err
}
r, _, err := store.GetRun(ctx, s.e, req.RunID)
r, _, err := store.GetRun(ctx, h.e, req.RunID)
if err != nil {
return err
}
@ -112,7 +112,7 @@ func (s *CommandHandler) StopRun(ctx context.Context, req *RunStopRequest) error
r.Stop = true
}
_, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt)
_, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt)
return err
}
@ -135,7 +135,7 @@ type RunCreateRequest struct {
ChangeGroupsUpdateToken string
}
func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) {
func (h *ActionHandler) CreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) {
runcgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken)
if err != nil {
return nil, err
@ -143,18 +143,18 @@ func (s *CommandHandler) CreateRun(ctx context.Context, req *RunCreateRequest) (
var rb *types.RunBundle
if req.RunID == "" {
rb, err = s.newRun(ctx, req)
rb, err = h.newRun(ctx, req)
} else {
rb, err = s.recreateRun(ctx, req)
rb, err = h.recreateRun(ctx, req)
}
if err != nil {
return nil, err
}
return rb, s.saveRun(ctx, rb, runcgt)
return rb, h.saveRun(ctx, rb, runcgt)
}
func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) {
func (h *ActionHandler) newRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) {
rcts := req.RunConfigTasks
setupErrors := req.SetupErrors
@ -169,21 +169,21 @@ func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*ty
}
// generate a new run sequence that will be the same for the run and runconfig
seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey)
seq, err := sequence.IncSequence(ctx, h.e, common.EtcdRunSequenceKey)
if err != nil {
return nil, err
}
id := seq.String()
if err := runconfig.CheckRunConfigTasks(rcts); err != nil {
s.log.Errorf("check run config tasks failed: %+v", err)
h.log.Errorf("check run config tasks failed: %+v", err)
setupErrors = append(setupErrors, err.Error())
}
// generate tasks levels
if len(setupErrors) == 0 {
if err := runconfig.GenTasksLevels(rcts); err != nil {
s.log.Errorf("gen tasks leveles failed: %+v", err)
h.log.Errorf("gen tasks leveles failed: %+v", err)
setupErrors = append(setupErrors, err.Error())
}
}
@ -200,7 +200,7 @@ func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*ty
}
run := genRun(rc)
s.log.Debugf("created run: %s", util.Dump(run))
h.log.Debugf("created run: %s", util.Dump(run))
return &types.RunBundle{
Run: run,
@ -208,22 +208,22 @@ func (s *CommandHandler) newRun(ctx context.Context, req *RunCreateRequest) (*ty
}, nil
}
func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) {
func (h *ActionHandler) recreateRun(ctx context.Context, req *RunCreateRequest) (*types.RunBundle, error) {
// generate a new run sequence that will be the same for the run and runconfig
seq, err := sequence.IncSequence(ctx, s.e, common.EtcdRunSequenceKey)
seq, err := sequence.IncSequence(ctx, h.e, common.EtcdRunSequenceKey)
if err != nil {
return nil, err
}
id := seq.String()
// fetch the existing runconfig and run
s.log.Infof("creating run from existing run")
rc, err := store.OSTGetRunConfig(s.dm, req.RunID)
h.log.Infof("creating run from existing run")
rc, err := store.OSTGetRunConfig(h.dm, req.RunID)
if err != nil {
return nil, util.NewErrBadRequest(errors.Wrapf(err, "runconfig %q doens't exist", req.RunID))
}
run, err := store.GetRunEtcdOrOST(ctx, s.e, s.dm, req.RunID)
run, err := store.GetRunEtcdOrOST(ctx, h.e, h.dm, req.RunID)
if err != nil {
return nil, err
}
@ -231,8 +231,8 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest)
return nil, util.NewErrBadRequest(errors.Wrapf(err, "run %q doens't exist", req.RunID))
}
s.log.Infof("rc: %s", util.Dump(rc))
s.log.Infof("run: %s", util.Dump(run))
h.log.Infof("rc: %s", util.Dump(rc))
h.log.Infof("run: %s", util.Dump(run))
if req.FromStart {
if canRestart, reason := run.CanRestartFromScratch(); !canRestart {
@ -246,8 +246,8 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest)
rb := recreateRun(util.DefaultUUIDGenerator{}, run, rc, id, req)
s.log.Infof("created rc from existing rc: %s", util.Dump(rb.Rc))
s.log.Infof("created run from existing run: %s", util.Dump(rb.Run))
h.log.Infof("created rc from existing rc: %s", util.Dump(rb.Rc))
h.log.Infof("created run from existing run: %s", util.Dump(rb.Run))
return rb, nil
}
@ -365,12 +365,12 @@ func recreateRun(uuid util.UUIDGenerator, run *types.Run, rc *types.RunConfig, n
}
}
func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcgt *types.ChangeGroupsUpdateToken) error {
func (h *ActionHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcgt *types.ChangeGroupsUpdateToken) error {
run := rb.Run
rc := rb.Rc
c, cgt, err := s.getRunCounter(run.Group)
s.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt))
c, cgt, err := h.getRunCounter(run.Group)
h.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt))
if err != nil {
return err
}
@ -395,15 +395,15 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg
}
actions = append(actions, rca)
if _, err = s.dm.WriteWal(ctx, actions, cgt); err != nil {
if _, err = h.dm.WriteWal(ctx, actions, cgt); err != nil {
return err
}
runEvent, err := common.NewRunEvent(ctx, s.e, run.ID, run.Phase, run.Result)
runEvent, err := common.NewRunEvent(ctx, h.e, run.ID, run.Phase, run.Result)
if err != nil {
return err
}
if _, err := store.AtomicPutRun(ctx, s.e, run, runEvent, runcgt); err != nil {
if _, err := store.AtomicPutRun(ctx, h.e, run, runEvent, runcgt); err != nil {
return err
}
return nil
@ -476,13 +476,13 @@ type RunTaskApproveRequest struct {
ChangeGroupsUpdateToken string
}
func (s *CommandHandler) ApproveRunTask(ctx context.Context, req *RunTaskApproveRequest) error {
func (h *ActionHandler) ApproveRunTask(ctx context.Context, req *RunTaskApproveRequest) error {
cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken)
if err != nil {
return err
}
r, _, err := store.GetRun(ctx, s.e, req.RunID)
r, _, err := store.GetRun(ctx, h.e, req.RunID)
if err != nil {
return err
}
@ -504,13 +504,13 @@ func (s *CommandHandler) ApproveRunTask(ctx context.Context, req *RunTaskApprove
task.Approved = true
task.ApprovalAnnotations = req.ApprovalAnnotations
_, err = store.AtomicPutRun(ctx, s.e, r, nil, cgt)
_, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt)
return err
}
func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string) error {
func (h *ActionHandler) DeleteExecutor(ctx context.Context, executorID string) error {
// mark all executor tasks as failed
ets, err := store.GetExecutorTasks(ctx, s.e, executorID)
ets, err := store.GetExecutorTasks(ctx, h.e, executorID)
if err != nil {
return err
}
@ -518,20 +518,20 @@ func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string)
for _, et := range ets {
et.Status.Phase = types.ExecutorTaskPhaseFailed
et.FailError = "executor deleted"
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
if _, err := store.AtomicPutExecutorTask(ctx, h.e, et); err != nil {
return err
}
}
// delete the executor
if err := store.DeleteExecutor(ctx, s.e, executorID); err != nil {
if err := store.DeleteExecutor(ctx, h.e, executorID); err != nil {
return err
}
return nil
}
func (s *CommandHandler) getRunCounter(group string) (uint64, *datamanager.ChangeGroupsUpdateToken, error) {
func (h *ActionHandler) getRunCounter(group string) (uint64, *datamanager.ChangeGroupsUpdateToken, error) {
// use the first group dir after the root
pl := util.PathList(group)
if len(pl) < 2 {
@ -540,13 +540,13 @@ func (s *CommandHandler) getRunCounter(group string) (uint64, *datamanager.Chang
var c uint64
var cgt *datamanager.ChangeGroupsUpdateToken
err := s.readDB.Do(func(tx *db.Tx) error {
err := h.readDB.Do(func(tx *db.Tx) error {
var err error
c, err = s.readDB.GetRunCounterOST(tx, pl[1])
c, err = h.readDB.GetRunCounterOST(tx, pl[1])
if err != nil {
return err
}
cgt, err = s.readDB.GetChangeGroupsUpdateTokensOST(tx, []string{"counter-" + pl[1]})
cgt, err = h.readDB.GetChangeGroupsUpdateTokensOST(tx, []string{"counter-" + pl[1]})
return err
})
if err != nil {

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package command
package action
import (
"testing"
@ -167,7 +167,7 @@ func TestRecreateRun(t *testing.T) {
// initial run that matched the runconfig, all tasks are not started or skipped
// (if the runconfig task as Skip == true). This must match the status
// generated by command.genRun()
// generated by action.genRun()
run := genRun(rc)
outrun := genRun(outrc)

View File

@ -27,7 +27,7 @@ import (
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/command"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/action"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
@ -501,13 +501,13 @@ type RunCreateRequest struct {
type RunCreateHandler struct {
log *zap.SugaredLogger
ch *command.CommandHandler
ah *action.ActionHandler
}
func NewRunCreateHandler(logger *zap.Logger, ch *command.CommandHandler) *RunCreateHandler {
func NewRunCreateHandler(logger *zap.Logger, ah *action.ActionHandler) *RunCreateHandler {
return &RunCreateHandler{
log: logger.Sugar(),
ch: ch,
ah: ah,
}
}
@ -521,7 +521,7 @@ func (h *RunCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
creq := &command.RunCreateRequest{
creq := &action.RunCreateRequest{
RunConfigTasks: req.RunConfigTasks,
Name: req.Name,
Group: req.Group,
@ -536,7 +536,7 @@ func (h *RunCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Annotations: req.Annotations,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
rb, err := h.ch.CreateRun(ctx, creq)
rb, err := h.ah.CreateRun(ctx, creq)
if err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
@ -569,14 +569,14 @@ type RunActionsRequest struct {
type RunActionsHandler struct {
log *zap.SugaredLogger
ch *command.CommandHandler
ah *action.ActionHandler
readDB *readdb.ReadDB
}
func NewRunActionsHandler(logger *zap.Logger, ch *command.CommandHandler) *RunActionsHandler {
func NewRunActionsHandler(logger *zap.Logger, ah *action.ActionHandler) *RunActionsHandler {
return &RunActionsHandler{
log: logger.Sugar(),
ch: ch,
ah: ah,
}
}
@ -594,22 +594,22 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch req.ActionType {
case RunActionTypeChangePhase:
creq := &command.RunChangePhaseRequest{
creq := &action.RunChangePhaseRequest{
RunID: runID,
Phase: req.Phase,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
if err := h.ch.ChangeRunPhase(ctx, creq); err != nil {
if err := h.ah.ChangeRunPhase(ctx, creq); err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
return
}
case RunActionTypeStop:
creq := &command.RunStopRequest{
creq := &action.RunStopRequest{
RunID: runID,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
if err := h.ch.StopRun(ctx, creq); err != nil {
if err := h.ah.StopRun(ctx, creq); err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
return
@ -634,14 +634,14 @@ type RunTaskActionsRequest struct {
type RunTaskActionsHandler struct {
log *zap.SugaredLogger
ch *command.CommandHandler
ah *action.ActionHandler
readDB *readdb.ReadDB
}
func NewRunTaskActionsHandler(logger *zap.Logger, ch *command.CommandHandler) *RunTaskActionsHandler {
func NewRunTaskActionsHandler(logger *zap.Logger, ah *action.ActionHandler) *RunTaskActionsHandler {
return &RunTaskActionsHandler{
log: logger.Sugar(),
ch: ch,
ah: ah,
}
}
@ -660,12 +660,12 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
switch req.ActionType {
case RunTaskActionTypeApprove:
creq := &command.RunTaskApproveRequest{
creq := &action.RunTaskApproveRequest{
RunID: runID,
TaskID: taskID,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
if err := h.ch.ApproveRunTask(ctx, creq); err != nil {
if err := h.ah.ApproveRunTask(ctx, creq); err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
return

View File

@ -26,7 +26,7 @@ import (
"github.com/gorilla/mux"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/command"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/action"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types"
@ -36,11 +36,11 @@ import (
type ExecutorStatusHandler struct {
log *zap.SugaredLogger
e *etcd.Store
ch *command.CommandHandler
ah *action.ActionHandler
}
func NewExecutorStatusHandler(logger *zap.Logger, e *etcd.Store, ch *command.CommandHandler) *ExecutorStatusHandler {
return &ExecutorStatusHandler{log: logger.Sugar(), e: e, ch: ch}
func NewExecutorStatusHandler(logger *zap.Logger, e *etcd.Store, ah *action.ActionHandler) *ExecutorStatusHandler {
return &ExecutorStatusHandler{log: logger.Sugar(), e: e, ah: ah}
}
func (h *ExecutorStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -92,7 +92,7 @@ func (h *ExecutorStatusHandler) deleteStaleExecutors(ctx context.Context, curExe
}
}
if !active {
if err := h.ch.DeleteExecutor(ctx, executor.ID); err != nil {
if err := h.ah.DeleteExecutor(ctx, executor.ID); err != nil {
h.log.Errorf("failed to delete executor %q: %v", executor.ID, err)
}
}
@ -429,13 +429,13 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type ExecutorDeleteHandler struct {
log *zap.SugaredLogger
ch *command.CommandHandler
ah *action.ActionHandler
}
func NewExecutorDeleteHandler(logger *zap.Logger, ch *command.CommandHandler) *ExecutorDeleteHandler {
func NewExecutorDeleteHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorDeleteHandler {
return &ExecutorDeleteHandler{
log: logger.Sugar(),
ch: ch,
ah: ah,
}
}
@ -450,7 +450,7 @@ func (h *ExecutorDeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
return
}
if err := h.ch.DeleteExecutor(ctx, executorID); err != nil {
if err := h.ah.DeleteExecutor(ctx, executorID); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}

View File

@ -34,8 +34,8 @@ import (
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/runconfig"
"github.com/sorintlab/agola/internal/services/config"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/action"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/api"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/command"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
@ -1399,7 +1399,7 @@ type Scheduler struct {
ost *objectstorage.ObjStorage
dm *datamanager.DataManager
readDB *readdb.ReadDB
ch *command.CommandHandler
ah *action.ActionHandler
}
func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Scheduler, error) {
@ -1443,8 +1443,8 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule
}
s.readDB = readDB
ch := command.NewCommandHandler(logger, e, readDB, ost, dm)
s.ch = ch
ah := action.NewActionHandler(logger, e, readDB, ost, dm)
s.ah = ah
return s, nil
}
@ -1497,7 +1497,7 @@ func (s *Scheduler) Run(ctx context.Context) error {
corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions)
// executor dedicated api, only calls from executor should happen on these handlers
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ch)
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah)
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch)
executorTaskHandler := api.NewExecutorTaskHandler(s.e)
executorTasksHandler := api.NewExecutorTasksHandler(s.e)
@ -1506,15 +1506,15 @@ func (s *Scheduler) Run(ctx context.Context) error {
cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost)
// api from clients
executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ch)
executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah)
logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm)
runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB)
runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ch)
runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah)
runsHandler := api.NewRunsHandler(logger, s.readDB)
runActionsHandler := api.NewRunActionsHandler(logger, s.ch)
runCreateHandler := api.NewRunCreateHandler(logger, s.ch)
runActionsHandler := api.NewRunActionsHandler(logger, s.ah)
runCreateHandler := api.NewRunCreateHandler(logger, s.ah)
changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB)
router := mux.NewRouter()