From afae185e1156d77848c93bda50a862a05c09f0ee Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Mon, 6 May 2019 15:19:29 +0200 Subject: [PATCH] *: rework run approval and annotations * runservice: use generic task annotations instead of approval annotations * runservice: add method to set task annotations * gateway: when an user call the run task approval action, it will set in the task annotations the approval users ids. The task won't be approved. * scheduler: when the number of approvers meets the required minimum number (currently 1) call the runservice to approve the task In this way we could easily implement some approval features like requiring a minimum number of approvers (saved in the task annotations) before marking the run as approved in the runservice. --- internal/services/gateway/action/run.go | 58 ++++++++++--- internal/services/gateway/api/run.go | 14 ++- internal/services/gateway/common/run.go | 2 + internal/services/gateway/gateway.go | 2 +- .../runservice/scheduler/action/action.go | 31 ++++++- .../services/runservice/scheduler/api/api.go | 27 +++++- .../runservice/scheduler/api/client.go | 32 +++++-- internal/services/runservice/types/types.go | 13 +-- internal/services/scheduler/scheduler.go | 85 ++++++++++++++++++- 9 files changed, 221 insertions(+), 43 deletions(-) diff --git a/internal/services/gateway/action/run.go b/internal/services/gateway/action/run.go index 5d311e6..884795e 100644 --- a/internal/services/gateway/action/run.go +++ b/internal/services/gateway/action/run.go @@ -16,8 +16,10 @@ package action import ( "context" + "encoding/json" "net/http" + "github.com/sorintlab/agola/internal/services/gateway/common" rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" "github.com/sorintlab/agola/internal/util" @@ -25,11 +27,10 @@ import ( ) func (h *ActionHandler) GetRun(ctx context.Context, runID string) (*rsapi.RunResponse, error) { - runResp, resp, err := h.runserviceClient.GetRun(ctx, runID) + runResp, resp, err := h.runserviceClient.GetRun(ctx, runID, nil) if err != nil { return nil, ErrFromRemote(resp, err) } - canGetRun, err := h.CanGetRun(ctx, runResp.RunConfig.Group) if err != nil { return nil, errors.Wrapf(err, "failed to determine permissions") @@ -79,7 +80,7 @@ type GetLogsRequest struct { } func (h *ActionHandler) GetLogs(ctx context.Context, req *GetLogsRequest) (*http.Response, error) { - runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID) + runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID, nil) if err != nil { return nil, ErrFromRemote(resp, err) } @@ -115,7 +116,7 @@ type RunActionsRequest struct { } func (h *ActionHandler) RunAction(ctx context.Context, req *RunActionsRequest) error { - runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID) + runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID, nil) if err != nil { return ErrFromRemote(resp, err) } @@ -166,28 +167,63 @@ type RunTaskActionsRequest struct { RunID string TaskID string - ActionType RunTaskActionType - ApprovalAnnotations map[string]string + ActionType RunTaskActionType } func (h *ActionHandler) RunTaskAction(ctx context.Context, req *RunTaskActionsRequest) error { - runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID) + runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID, nil) if err != nil { return ErrFromRemote(resp, err) } - canGetRun, err := h.CanDoRunActions(ctx, runResp.RunConfig.Group) + canDoRunAction, err := h.CanDoRunActions(ctx, runResp.RunConfig.Group) if err != nil { return errors.Wrapf(err, "failed to determine permissions") } - if !canGetRun { + if !canDoRunAction { return util.NewErrForbidden(errors.Errorf("user not authorized")) } + curUserID := h.CurrentUserID(ctx) + if curUserID == "" { + return util.NewErrBadRequest(errors.Errorf("no logged in user")) + } switch req.ActionType { case RunTaskActionTypeApprove: + rt, ok := runResp.Run.Tasks[req.TaskID] + if !ok { + return util.NewErrBadRequest(errors.Errorf("run %q doesn't have task %q", req.RunID, req.TaskID)) + } + + approvers := []string{} + annotations := map[string]string{} + if rt.Annotations != nil { + annotations = rt.Annotations + } + approversAnnotation, ok := annotations[common.ApproversAnnotation] + if ok { + if err := json.Unmarshal([]byte(approversAnnotation), &approvers); err != nil { + return errors.Wrapf(err, "failed to unmarshal run task approvers annotation") + } + } + + for _, approver := range approvers { + if approver == curUserID { + return util.NewErrBadRequest(errors.Errorf("user %q alredy approved the task", approver)) + } + } + approvers = append(approvers, curUserID) + + approversj, err := json.Marshal(approvers) + if err != nil { + return errors.Wrapf(err, "failed to marshal run task approvers annotation") + } + + annotations[common.ApproversAnnotation] = string(approversj) + rsreq := &rsapi.RunTaskActionsRequest{ - ActionType: rsapi.RunTaskActionTypeApprove, - ApprovalAnnotations: req.ApprovalAnnotations, + ActionType: rsapi.RunTaskActionTypeSetAnnotations, + Annotations: annotations, + ChangeGroupsUpdateToken: runResp.ChangeGroupsUpdateToken, } resp, err := h.runserviceClient.RunTaskActions(ctx, req.RunID, req.TaskID, rsreq) diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index d8990cc..0b8d0af 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -153,7 +153,7 @@ func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.Run WaitingApproval: rt.WaitingApproval, Approved: rt.Approved, - ApprovalAnnotations: rt.ApprovalAnnotations, + ApprovalAnnotations: rt.Annotations, Level: rct.Level, Depends: rct.Depends, @@ -170,7 +170,7 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *Run WaitingApproval: rt.WaitingApproval, Approved: rt.Approved, - ApprovalAnnotations: rt.ApprovalAnnotations, + ApprovalAnnotations: rt.Annotations, Steps: make([]*RunTaskResponseStep, len(rt.Steps)), @@ -415,8 +415,7 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type RunTaskActionsRequest struct { - ActionType action.RunTaskActionType `json:"action_type"` - ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"` + ActionType action.RunTaskActionType `json:"action_type"` } type RunTaskActionsHandler struct { @@ -442,10 +441,9 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request } areq := &action.RunTaskActionsRequest{ - RunID: runID, - TaskID: taskID, - ActionType: req.ActionType, - ApprovalAnnotations: req.ApprovalAnnotations, + RunID: runID, + TaskID: taskID, + ActionType: req.ActionType, } err := h.ah.RunTaskAction(ctx, areq) diff --git a/internal/services/gateway/common/run.go b/internal/services/gateway/common/run.go index 3f2764f..bd495c7 100644 --- a/internal/services/gateway/common/run.go +++ b/internal/services/gateway/common/run.go @@ -35,6 +35,8 @@ const ( GroupTypeBranch GroupType = "branch" GroupTypeTag GroupType = "tag" GroupTypePullRequest GroupType = "pr" + + ApproversAnnotation = "approvers" ) func GenRunGroup(baseGroupType GroupType, baseGroupID string, webhookData *types.WebhookData) string { diff --git a/internal/services/gateway/gateway.go b/internal/services/gateway/gateway.go index d1d9bf9..dd7b138 100644 --- a/internal/services/gateway/gateway.go +++ b/internal/services/gateway/gateway.go @@ -263,7 +263,7 @@ func (g *Gateway) Run(ctx context.Context) error { apirouter.Handle("/runs/{runid}", authForcedHandler(runHandler)).Methods("GET") apirouter.Handle("/runs/{runid}/actions", authForcedHandler(runActionsHandler)).Methods("PUT") apirouter.Handle("/runs/{runid}/tasks/{taskid}", authForcedHandler(runtaskHandler)).Methods("GET") - apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT") + apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", authForcedHandler(runTaskActionsHandler)).Methods("PUT") apirouter.Handle("/runs", authForcedHandler(runsHandler)).Methods("GET") // TODO(sgotti) add auth to these requests diff --git a/internal/services/runservice/scheduler/action/action.go b/internal/services/runservice/scheduler/action/action.go index f4cf907..95749ba 100644 --- a/internal/services/runservice/scheduler/action/action.go +++ b/internal/services/runservice/scheduler/action/action.go @@ -469,10 +469,38 @@ func genRun(rc *types.RunConfig) *types.Run { return r } +type RunTaskSetAnnotationsRequest struct { + RunID string + TaskID string + Annotations map[string]string + ChangeGroupsUpdateToken string +} + +func (h *ActionHandler) RunTaskSetAnnotations(ctx context.Context, req *RunTaskSetAnnotationsRequest) error { + cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken) + if err != nil { + return err + } + + r, _, err := store.GetRun(ctx, h.e, req.RunID) + if err != nil { + return err + } + + task, ok := r.Tasks[req.TaskID] + if !ok { + return util.NewErrBadRequest(errors.Errorf("run %q doesn't have task %q", r.ID, req.TaskID)) + } + + task.Annotations = req.Annotations + + _, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt) + return err +} + type RunTaskApproveRequest struct { RunID string TaskID string - ApprovalAnnotations map[string]string ChangeGroupsUpdateToken string } @@ -502,7 +530,6 @@ func (h *ActionHandler) ApproveRunTask(ctx context.Context, req *RunTaskApproveR task.WaitingApproval = false task.Approved = true - task.ApprovalAnnotations = req.ApprovalAnnotations _, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt) return err diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 8053f3e..1fa2d60 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -635,13 +635,18 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { type RunTaskActionType string const ( - RunTaskActionTypeApprove RunTaskActionType = "approve" + RunTaskActionTypeSetAnnotations RunTaskActionType = "setannotations" + RunTaskActionTypeApprove RunTaskActionType = "approve" ) type RunTaskActionsRequest struct { - ActionType RunTaskActionType `json:"action_type"` - ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"` - ChangeGroupsUpdateToken string `json:"change_groups_update_tokens"` + ActionType RunTaskActionType `json:"action_type"` + + // set Annotations fields + Annotations map[string]string `json:"annotations,omitempty"` + + // global fields + ChangeGroupsUpdateToken string `json:"change_groups_update_tokens"` } type RunTaskActionsHandler struct { @@ -671,6 +676,19 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request } switch req.ActionType { + case RunTaskActionTypeSetAnnotations: + creq := &action.RunTaskSetAnnotationsRequest{ + RunID: runID, + TaskID: taskID, + Annotations: req.Annotations, + ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, + } + if err := h.ah.RunTaskSetAnnotations(ctx, creq); err != nil { + h.log.Errorf("err: %+v", err) + httpError(w, err) + return + } + case RunTaskActionTypeApprove: creq := &action.RunTaskApproveRequest{ RunID: runID, @@ -682,6 +700,7 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request httpError(w, err) return } + default: http.Error(w, "", http.StatusBadRequest) return diff --git a/internal/services/runservice/scheduler/api/client.go b/internal/services/runservice/scheduler/api/client.go index a2e8955..3651187 100644 --- a/internal/services/runservice/scheduler/api/client.go +++ b/internal/services/runservice/scheduler/api/client.go @@ -194,8 +194,12 @@ func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, last return getRunsResponse, resp, err } -func (c *Client) GetQueuedRuns(ctx context.Context, start string, limit int) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"queued"}, []string{}, false, nil, start, limit, true) +func (c *Client) GetQueuedRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { + return c.GetRuns(ctx, []string{"queued"}, []string{}, false, changeGroups, start, limit, true) +} + +func (c *Client) GetRunningRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { + return c.GetRuns(ctx, []string{"running"}, []string{}, false, changeGroups, start, limit, true) } func (c *Client) GetGroupQueuedRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { @@ -245,19 +249,33 @@ func (c *Client) RunTaskActions(ctx context.Context, runID, taskID string, req * return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/tasks/%s/actions", runID, taskID), nil, -1, jsonContent, bytes.NewReader(reqj)) } -func (c *Client) ApproveRunTask(ctx context.Context, runID, taskID string, approvalAnnotations map[string]string, changeGroupsUpdateToken string) (*http.Response, error) { +func (c *Client) RunTaskSetAnnotations(ctx context.Context, runID, taskID string, annotations map[string]string, changeGroupsUpdateToken string) (*http.Response, error) { req := &RunTaskActionsRequest{ - ActionType: RunTaskActionTypeApprove, - ApprovalAnnotations: approvalAnnotations, + ActionType: RunTaskActionTypeSetAnnotations, + Annotations: annotations, ChangeGroupsUpdateToken: changeGroupsUpdateToken, } return c.RunTaskActions(ctx, runID, taskID, req) } -func (c *Client) GetRun(ctx context.Context, runID string) (*RunResponse, *http.Response, error) { +func (c *Client) ApproveRunTask(ctx context.Context, runID, taskID string, changeGroupsUpdateToken string) (*http.Response, error) { + req := &RunTaskActionsRequest{ + ActionType: RunTaskActionTypeApprove, + ChangeGroupsUpdateToken: changeGroupsUpdateToken, + } + + return c.RunTaskActions(ctx, runID, taskID, req) +} + +func (c *Client) GetRun(ctx context.Context, runID string, changeGroups []string) (*RunResponse, *http.Response, error) { + q := url.Values{} + for _, changeGroup := range changeGroups { + q.Add("changegroup", changeGroup) + } + runResponse := new(RunResponse) - resp, err := c.getParsedResponse(ctx, "GET", fmt.Sprintf("/runs/%s", runID), nil, jsonContent, nil, runResponse) + resp, err := c.getParsedResponse(ctx, "GET", fmt.Sprintf("/runs/%s", runID), q, jsonContent, nil, runResponse) return runResponse, resp, err } diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index e3ae7c3..3385301 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -95,7 +95,7 @@ type Run struct { // /project/$projectid/pr/$prid Group string `json:"group,omitempty"` - // Annotations contain custom run properties + // Annotations contain custom run annotations Annotations map[string]string `json:"annotations,omitempty"` // Phase represent the current run status. A run could be running but already @@ -219,14 +219,15 @@ type RunTask struct { // there're no executor tasks scheduled Status RunTaskStatus `json:"status,omitempty"` + // Annotations contain custom task annotations + // these are opaque to the runservice and used for multiple pourposes. For + // example to stores task approval metadata. + Annotations map[string]string `json:"annotations,omitempty"` + Skip bool `json:"skip,omitempty"` WaitingApproval bool `json:"waiting_approval,omitempty"` Approved bool `json:"approved,omitempty"` - // ApprovalAnnotations stores data that the user can set on the approval. Useful - // to save approval information like the user who approved the task. - // This data is opaque to the run service - ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"` SetupStep RunTaskStep `json:"setup_step,omitempty"` Steps []*RunTaskStep `json:"steps,omitempty"` @@ -291,7 +292,7 @@ type RunConfig struct { // A list of setup errors when the run is in phase setuperror SetupErrors []string `json:"setup_errors,omitempty"` - // Annotations contain custom run properties + // Annotations contain custom run annotations // Note: Annotations are currently both saved in a Run and in RunConfig to // easily return them without loading RunConfig from the lts Annotations map[string]string `json:"annotations,omitempty"` diff --git a/internal/services/scheduler/scheduler.go b/internal/services/scheduler/scheduler.go index b70b2e2..8334ea1 100644 --- a/internal/services/scheduler/scheduler.go +++ b/internal/services/scheduler/scheduler.go @@ -16,11 +16,13 @@ package scheduler import ( "context" + "encoding/json" "fmt" "time" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/services/config" + "github.com/sorintlab/agola/internal/services/gateway/common" rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" "github.com/sorintlab/agola/internal/util" @@ -48,7 +50,7 @@ func (s *Scheduler) schedule(ctx context.Context) error { var lastRunID string for { - queuedRunsResponse, _, err := s.runserviceClient.GetQueuedRuns(ctx, lastRunID, 0) + queuedRunsResponse, _, err := s.runserviceClient.GetQueuedRuns(ctx, lastRunID, 0, nil) if err != nil { return errors.Wrapf(err, "failed to get queued runs") } @@ -62,9 +64,7 @@ func (s *Scheduler) schedule(ctx context.Context) error { break } - if len(queuedRunsResponse.Runs) > 0 { - lastRunID = queuedRunsResponse.Runs[len(queuedRunsResponse.Runs)-1].ID - } + lastRunID = queuedRunsResponse.Runs[len(queuedRunsResponse.Runs)-1].ID } for groupID := range groups { @@ -107,6 +107,82 @@ func (s *Scheduler) scheduleRun(ctx context.Context, groupID string) error { return nil } +func (s *Scheduler) approveLoop(ctx context.Context) { + for { + if err := s.approve(ctx); err != nil { + log.Errorf("err: %+v", err) + } + time.Sleep(1 * time.Second) + } +} + +func (s *Scheduler) approve(ctx context.Context) error { + var lastRunID string + for { + runningRunsResponse, _, err := s.runserviceClient.GetRunningRuns(ctx, lastRunID, 0, nil) + if err != nil { + return errors.Wrapf(err, "failed to get running runs") + } + + if len(runningRunsResponse.Runs) == 0 { + break + } + + for _, run := range runningRunsResponse.Runs { + if err := s.approveRunTasks(ctx, run.ID); err != nil { + // just log error and continue with the other runs + log.Errorf("failed to approve run tasks for run %q: %+v", run.ID, err) + } + } + + lastRunID = runningRunsResponse.Runs[len(runningRunsResponse.Runs)-1].ID + } + + return nil +} + +func (s *Scheduler) approveRunTasks(ctx context.Context, runID string) error { + // refetch run with a dedicated changegroup + changegroup := util.EncodeSha256Hex(fmt.Sprintf("approval-%s", runID)) + runResp, _, err := s.runserviceClient.GetRun(ctx, runID, []string{changegroup}) + if err != nil { + return errors.Wrapf(err, "failed to get run %q", runID) + } + run := runResp.Run + + tasksWaitingApproval := run.TasksWaitingApproval() + for _, rtID := range tasksWaitingApproval { + rt, ok := run.Tasks[rtID] + if !ok { + return util.NewErrBadRequest(errors.Errorf("run %q doesn't have task %q", run.ID, rtID)) + } + annotations := rt.Annotations + if annotations == nil { + continue + } + approversAnnotation, ok := annotations[common.ApproversAnnotation] + if !ok { + continue + } + var approvers []string + if err := json.Unmarshal([]byte(approversAnnotation), &approvers); err != nil { + return errors.Wrapf(err, "failed to unmarshal run task approvers annotation") + } + // TODO(sgotti) change when we introduce a config the set the minimum number of required approvers + if len(approvers) > 0 { + rsreq := &rsapi.RunTaskActionsRequest{ + ActionType: rsapi.RunTaskActionTypeApprove, + ChangeGroupsUpdateToken: runResp.ChangeGroupsUpdateToken, + } + if _, err := s.runserviceClient.RunTaskActions(ctx, run.ID, rt.ID, rsreq); err != nil { + return errors.Wrapf(err, "failed to approve run") + } + } + } + + return nil +} + type Scheduler struct { c *config.Scheduler runserviceClient *rsapi.Client @@ -124,6 +200,7 @@ func NewScheduler(c *config.Scheduler) (*Scheduler, error) { func (s *Scheduler) Run(ctx context.Context) error { go s.scheduleLoop(ctx) + go s.approveLoop(ctx) select { case <-ctx.Done():