diff --git a/internal/services/gateway/action/run.go b/internal/services/gateway/action/run.go index 5d311e6..884795e 100644 --- a/internal/services/gateway/action/run.go +++ b/internal/services/gateway/action/run.go @@ -16,8 +16,10 @@ package action import ( "context" + "encoding/json" "net/http" + "github.com/sorintlab/agola/internal/services/gateway/common" rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" "github.com/sorintlab/agola/internal/util" @@ -25,11 +27,10 @@ import ( ) func (h *ActionHandler) GetRun(ctx context.Context, runID string) (*rsapi.RunResponse, error) { - runResp, resp, err := h.runserviceClient.GetRun(ctx, runID) + runResp, resp, err := h.runserviceClient.GetRun(ctx, runID, nil) if err != nil { return nil, ErrFromRemote(resp, err) } - canGetRun, err := h.CanGetRun(ctx, runResp.RunConfig.Group) if err != nil { return nil, errors.Wrapf(err, "failed to determine permissions") @@ -79,7 +80,7 @@ type GetLogsRequest struct { } func (h *ActionHandler) GetLogs(ctx context.Context, req *GetLogsRequest) (*http.Response, error) { - runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID) + runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID, nil) if err != nil { return nil, ErrFromRemote(resp, err) } @@ -115,7 +116,7 @@ type RunActionsRequest struct { } func (h *ActionHandler) RunAction(ctx context.Context, req *RunActionsRequest) error { - runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID) + runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID, nil) if err != nil { return ErrFromRemote(resp, err) } @@ -166,28 +167,63 @@ type RunTaskActionsRequest struct { RunID string TaskID string - ActionType RunTaskActionType - ApprovalAnnotations map[string]string + ActionType RunTaskActionType } func (h *ActionHandler) RunTaskAction(ctx context.Context, req *RunTaskActionsRequest) error { - runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID) + runResp, resp, err := h.runserviceClient.GetRun(ctx, req.RunID, nil) if err != nil { return ErrFromRemote(resp, err) } - canGetRun, err := h.CanDoRunActions(ctx, runResp.RunConfig.Group) + canDoRunAction, err := h.CanDoRunActions(ctx, runResp.RunConfig.Group) if err != nil { return errors.Wrapf(err, "failed to determine permissions") } - if !canGetRun { + if !canDoRunAction { return util.NewErrForbidden(errors.Errorf("user not authorized")) } + curUserID := h.CurrentUserID(ctx) + if curUserID == "" { + return util.NewErrBadRequest(errors.Errorf("no logged in user")) + } switch req.ActionType { case RunTaskActionTypeApprove: + rt, ok := runResp.Run.Tasks[req.TaskID] + if !ok { + return util.NewErrBadRequest(errors.Errorf("run %q doesn't have task %q", req.RunID, req.TaskID)) + } + + approvers := []string{} + annotations := map[string]string{} + if rt.Annotations != nil { + annotations = rt.Annotations + } + approversAnnotation, ok := annotations[common.ApproversAnnotation] + if ok { + if err := json.Unmarshal([]byte(approversAnnotation), &approvers); err != nil { + return errors.Wrapf(err, "failed to unmarshal run task approvers annotation") + } + } + + for _, approver := range approvers { + if approver == curUserID { + return util.NewErrBadRequest(errors.Errorf("user %q alredy approved the task", approver)) + } + } + approvers = append(approvers, curUserID) + + approversj, err := json.Marshal(approvers) + if err != nil { + return errors.Wrapf(err, "failed to marshal run task approvers annotation") + } + + annotations[common.ApproversAnnotation] = string(approversj) + rsreq := &rsapi.RunTaskActionsRequest{ - ActionType: rsapi.RunTaskActionTypeApprove, - ApprovalAnnotations: req.ApprovalAnnotations, + ActionType: rsapi.RunTaskActionTypeSetAnnotations, + Annotations: annotations, + ChangeGroupsUpdateToken: runResp.ChangeGroupsUpdateToken, } resp, err := h.runserviceClient.RunTaskActions(ctx, req.RunID, req.TaskID, rsreq) diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index d8990cc..0b8d0af 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -153,7 +153,7 @@ func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.Run WaitingApproval: rt.WaitingApproval, Approved: rt.Approved, - ApprovalAnnotations: rt.ApprovalAnnotations, + ApprovalAnnotations: rt.Annotations, Level: rct.Level, Depends: rct.Depends, @@ -170,7 +170,7 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *Run WaitingApproval: rt.WaitingApproval, Approved: rt.Approved, - ApprovalAnnotations: rt.ApprovalAnnotations, + ApprovalAnnotations: rt.Annotations, Steps: make([]*RunTaskResponseStep, len(rt.Steps)), @@ -415,8 +415,7 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type RunTaskActionsRequest struct { - ActionType action.RunTaskActionType `json:"action_type"` - ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"` + ActionType action.RunTaskActionType `json:"action_type"` } type RunTaskActionsHandler struct { @@ -442,10 +441,9 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request } areq := &action.RunTaskActionsRequest{ - RunID: runID, - TaskID: taskID, - ActionType: req.ActionType, - ApprovalAnnotations: req.ApprovalAnnotations, + RunID: runID, + TaskID: taskID, + ActionType: req.ActionType, } err := h.ah.RunTaskAction(ctx, areq) diff --git a/internal/services/gateway/common/run.go b/internal/services/gateway/common/run.go index 3f2764f..bd495c7 100644 --- a/internal/services/gateway/common/run.go +++ b/internal/services/gateway/common/run.go @@ -35,6 +35,8 @@ const ( GroupTypeBranch GroupType = "branch" GroupTypeTag GroupType = "tag" GroupTypePullRequest GroupType = "pr" + + ApproversAnnotation = "approvers" ) func GenRunGroup(baseGroupType GroupType, baseGroupID string, webhookData *types.WebhookData) string { diff --git a/internal/services/gateway/gateway.go b/internal/services/gateway/gateway.go index d1d9bf9..dd7b138 100644 --- a/internal/services/gateway/gateway.go +++ b/internal/services/gateway/gateway.go @@ -263,7 +263,7 @@ func (g *Gateway) Run(ctx context.Context) error { apirouter.Handle("/runs/{runid}", authForcedHandler(runHandler)).Methods("GET") apirouter.Handle("/runs/{runid}/actions", authForcedHandler(runActionsHandler)).Methods("PUT") apirouter.Handle("/runs/{runid}/tasks/{taskid}", authForcedHandler(runtaskHandler)).Methods("GET") - apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT") + apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", authForcedHandler(runTaskActionsHandler)).Methods("PUT") apirouter.Handle("/runs", authForcedHandler(runsHandler)).Methods("GET") // TODO(sgotti) add auth to these requests diff --git a/internal/services/runservice/scheduler/action/action.go b/internal/services/runservice/scheduler/action/action.go index f4cf907..95749ba 100644 --- a/internal/services/runservice/scheduler/action/action.go +++ b/internal/services/runservice/scheduler/action/action.go @@ -469,10 +469,38 @@ func genRun(rc *types.RunConfig) *types.Run { return r } +type RunTaskSetAnnotationsRequest struct { + RunID string + TaskID string + Annotations map[string]string + ChangeGroupsUpdateToken string +} + +func (h *ActionHandler) RunTaskSetAnnotations(ctx context.Context, req *RunTaskSetAnnotationsRequest) error { + cgt, err := types.UnmarshalChangeGroupsUpdateToken(req.ChangeGroupsUpdateToken) + if err != nil { + return err + } + + r, _, err := store.GetRun(ctx, h.e, req.RunID) + if err != nil { + return err + } + + task, ok := r.Tasks[req.TaskID] + if !ok { + return util.NewErrBadRequest(errors.Errorf("run %q doesn't have task %q", r.ID, req.TaskID)) + } + + task.Annotations = req.Annotations + + _, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt) + return err +} + type RunTaskApproveRequest struct { RunID string TaskID string - ApprovalAnnotations map[string]string ChangeGroupsUpdateToken string } @@ -502,7 +530,6 @@ func (h *ActionHandler) ApproveRunTask(ctx context.Context, req *RunTaskApproveR task.WaitingApproval = false task.Approved = true - task.ApprovalAnnotations = req.ApprovalAnnotations _, err = store.AtomicPutRun(ctx, h.e, r, nil, cgt) return err diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 8053f3e..1fa2d60 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -635,13 +635,18 @@ func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { type RunTaskActionType string const ( - RunTaskActionTypeApprove RunTaskActionType = "approve" + RunTaskActionTypeSetAnnotations RunTaskActionType = "setannotations" + RunTaskActionTypeApprove RunTaskActionType = "approve" ) type RunTaskActionsRequest struct { - ActionType RunTaskActionType `json:"action_type"` - ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"` - ChangeGroupsUpdateToken string `json:"change_groups_update_tokens"` + ActionType RunTaskActionType `json:"action_type"` + + // set Annotations fields + Annotations map[string]string `json:"annotations,omitempty"` + + // global fields + ChangeGroupsUpdateToken string `json:"change_groups_update_tokens"` } type RunTaskActionsHandler struct { @@ -671,6 +676,19 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request } switch req.ActionType { + case RunTaskActionTypeSetAnnotations: + creq := &action.RunTaskSetAnnotationsRequest{ + RunID: runID, + TaskID: taskID, + Annotations: req.Annotations, + ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken, + } + if err := h.ah.RunTaskSetAnnotations(ctx, creq); err != nil { + h.log.Errorf("err: %+v", err) + httpError(w, err) + return + } + case RunTaskActionTypeApprove: creq := &action.RunTaskApproveRequest{ RunID: runID, @@ -682,6 +700,7 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request httpError(w, err) return } + default: http.Error(w, "", http.StatusBadRequest) return diff --git a/internal/services/runservice/scheduler/api/client.go b/internal/services/runservice/scheduler/api/client.go index a2e8955..3651187 100644 --- a/internal/services/runservice/scheduler/api/client.go +++ b/internal/services/runservice/scheduler/api/client.go @@ -194,8 +194,12 @@ func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, last return getRunsResponse, resp, err } -func (c *Client) GetQueuedRuns(ctx context.Context, start string, limit int) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"queued"}, []string{}, false, nil, start, limit, true) +func (c *Client) GetQueuedRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { + return c.GetRuns(ctx, []string{"queued"}, []string{}, false, changeGroups, start, limit, true) +} + +func (c *Client) GetRunningRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { + return c.GetRuns(ctx, []string{"running"}, []string{}, false, changeGroups, start, limit, true) } func (c *Client) GetGroupQueuedRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { @@ -245,19 +249,33 @@ func (c *Client) RunTaskActions(ctx context.Context, runID, taskID string, req * return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/tasks/%s/actions", runID, taskID), nil, -1, jsonContent, bytes.NewReader(reqj)) } -func (c *Client) ApproveRunTask(ctx context.Context, runID, taskID string, approvalAnnotations map[string]string, changeGroupsUpdateToken string) (*http.Response, error) { +func (c *Client) RunTaskSetAnnotations(ctx context.Context, runID, taskID string, annotations map[string]string, changeGroupsUpdateToken string) (*http.Response, error) { req := &RunTaskActionsRequest{ - ActionType: RunTaskActionTypeApprove, - ApprovalAnnotations: approvalAnnotations, + ActionType: RunTaskActionTypeSetAnnotations, + Annotations: annotations, ChangeGroupsUpdateToken: changeGroupsUpdateToken, } return c.RunTaskActions(ctx, runID, taskID, req) } -func (c *Client) GetRun(ctx context.Context, runID string) (*RunResponse, *http.Response, error) { +func (c *Client) ApproveRunTask(ctx context.Context, runID, taskID string, changeGroupsUpdateToken string) (*http.Response, error) { + req := &RunTaskActionsRequest{ + ActionType: RunTaskActionTypeApprove, + ChangeGroupsUpdateToken: changeGroupsUpdateToken, + } + + return c.RunTaskActions(ctx, runID, taskID, req) +} + +func (c *Client) GetRun(ctx context.Context, runID string, changeGroups []string) (*RunResponse, *http.Response, error) { + q := url.Values{} + for _, changeGroup := range changeGroups { + q.Add("changegroup", changeGroup) + } + runResponse := new(RunResponse) - resp, err := c.getParsedResponse(ctx, "GET", fmt.Sprintf("/runs/%s", runID), nil, jsonContent, nil, runResponse) + resp, err := c.getParsedResponse(ctx, "GET", fmt.Sprintf("/runs/%s", runID), q, jsonContent, nil, runResponse) return runResponse, resp, err } diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index e3ae7c3..3385301 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -95,7 +95,7 @@ type Run struct { // /project/$projectid/pr/$prid Group string `json:"group,omitempty"` - // Annotations contain custom run properties + // Annotations contain custom run annotations Annotations map[string]string `json:"annotations,omitempty"` // Phase represent the current run status. A run could be running but already @@ -219,14 +219,15 @@ type RunTask struct { // there're no executor tasks scheduled Status RunTaskStatus `json:"status,omitempty"` + // Annotations contain custom task annotations + // these are opaque to the runservice and used for multiple pourposes. For + // example to stores task approval metadata. + Annotations map[string]string `json:"annotations,omitempty"` + Skip bool `json:"skip,omitempty"` WaitingApproval bool `json:"waiting_approval,omitempty"` Approved bool `json:"approved,omitempty"` - // ApprovalAnnotations stores data that the user can set on the approval. Useful - // to save approval information like the user who approved the task. - // This data is opaque to the run service - ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"` SetupStep RunTaskStep `json:"setup_step,omitempty"` Steps []*RunTaskStep `json:"steps,omitempty"` @@ -291,7 +292,7 @@ type RunConfig struct { // A list of setup errors when the run is in phase setuperror SetupErrors []string `json:"setup_errors,omitempty"` - // Annotations contain custom run properties + // Annotations contain custom run annotations // Note: Annotations are currently both saved in a Run and in RunConfig to // easily return them without loading RunConfig from the lts Annotations map[string]string `json:"annotations,omitempty"` diff --git a/internal/services/scheduler/scheduler.go b/internal/services/scheduler/scheduler.go index b70b2e2..8334ea1 100644 --- a/internal/services/scheduler/scheduler.go +++ b/internal/services/scheduler/scheduler.go @@ -16,11 +16,13 @@ package scheduler import ( "context" + "encoding/json" "fmt" "time" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/services/config" + "github.com/sorintlab/agola/internal/services/gateway/common" rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" "github.com/sorintlab/agola/internal/util" @@ -48,7 +50,7 @@ func (s *Scheduler) schedule(ctx context.Context) error { var lastRunID string for { - queuedRunsResponse, _, err := s.runserviceClient.GetQueuedRuns(ctx, lastRunID, 0) + queuedRunsResponse, _, err := s.runserviceClient.GetQueuedRuns(ctx, lastRunID, 0, nil) if err != nil { return errors.Wrapf(err, "failed to get queued runs") } @@ -62,9 +64,7 @@ func (s *Scheduler) schedule(ctx context.Context) error { break } - if len(queuedRunsResponse.Runs) > 0 { - lastRunID = queuedRunsResponse.Runs[len(queuedRunsResponse.Runs)-1].ID - } + lastRunID = queuedRunsResponse.Runs[len(queuedRunsResponse.Runs)-1].ID } for groupID := range groups { @@ -107,6 +107,82 @@ func (s *Scheduler) scheduleRun(ctx context.Context, groupID string) error { return nil } +func (s *Scheduler) approveLoop(ctx context.Context) { + for { + if err := s.approve(ctx); err != nil { + log.Errorf("err: %+v", err) + } + time.Sleep(1 * time.Second) + } +} + +func (s *Scheduler) approve(ctx context.Context) error { + var lastRunID string + for { + runningRunsResponse, _, err := s.runserviceClient.GetRunningRuns(ctx, lastRunID, 0, nil) + if err != nil { + return errors.Wrapf(err, "failed to get running runs") + } + + if len(runningRunsResponse.Runs) == 0 { + break + } + + for _, run := range runningRunsResponse.Runs { + if err := s.approveRunTasks(ctx, run.ID); err != nil { + // just log error and continue with the other runs + log.Errorf("failed to approve run tasks for run %q: %+v", run.ID, err) + } + } + + lastRunID = runningRunsResponse.Runs[len(runningRunsResponse.Runs)-1].ID + } + + return nil +} + +func (s *Scheduler) approveRunTasks(ctx context.Context, runID string) error { + // refetch run with a dedicated changegroup + changegroup := util.EncodeSha256Hex(fmt.Sprintf("approval-%s", runID)) + runResp, _, err := s.runserviceClient.GetRun(ctx, runID, []string{changegroup}) + if err != nil { + return errors.Wrapf(err, "failed to get run %q", runID) + } + run := runResp.Run + + tasksWaitingApproval := run.TasksWaitingApproval() + for _, rtID := range tasksWaitingApproval { + rt, ok := run.Tasks[rtID] + if !ok { + return util.NewErrBadRequest(errors.Errorf("run %q doesn't have task %q", run.ID, rtID)) + } + annotations := rt.Annotations + if annotations == nil { + continue + } + approversAnnotation, ok := annotations[common.ApproversAnnotation] + if !ok { + continue + } + var approvers []string + if err := json.Unmarshal([]byte(approversAnnotation), &approvers); err != nil { + return errors.Wrapf(err, "failed to unmarshal run task approvers annotation") + } + // TODO(sgotti) change when we introduce a config the set the minimum number of required approvers + if len(approvers) > 0 { + rsreq := &rsapi.RunTaskActionsRequest{ + ActionType: rsapi.RunTaskActionTypeApprove, + ChangeGroupsUpdateToken: runResp.ChangeGroupsUpdateToken, + } + if _, err := s.runserviceClient.RunTaskActions(ctx, run.ID, rt.ID, rsreq); err != nil { + return errors.Wrapf(err, "failed to approve run") + } + } + } + + return nil +} + type Scheduler struct { c *config.Scheduler runserviceClient *rsapi.Client @@ -124,6 +200,7 @@ func NewScheduler(c *config.Scheduler) (*Scheduler, error) { func (s *Scheduler) Run(ctx context.Context) error { go s.scheduleLoop(ctx) + go s.approveLoop(ctx) select { case <-ctx.Done():