agola/internal/services/runservice/scheduler/api/api.go
Simone Gotti afae185e11 *: rework run approval and annotations
* runservice: use generic task annotations instead of approval annotations
* runservice: add method to set task annotations

* gateway: when an user call the run task approval action, it will set in the
task annotations the approval users ids. The task won't be approved.

* scheduler: when the number of approvers meets the required minimum number
(currently 1) call the runservice to approve the task

In this way we could easily implement some approval features like requiring a
minimum number of approvers (saved in the task annotations) before marking the
run as approved in the runservice.
2019-05-06 15:19:29 +02:00

709 lines
17 KiB
Go

// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/action"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type ErrorResponse struct {
Message string `json:"message"`
}
func ErrorResponseFromError(err error) *ErrorResponse {
switch {
case util.IsErrBadRequest(err):
fallthrough
case util.IsErrNotFound(err):
return &ErrorResponse{Message: err.Error()}
}
// on generic error return an generic message to not leak the real error
return &ErrorResponse{Message: "internal server error"}
}
func httpError(w http.ResponseWriter, err error) bool {
if err == nil {
return false
}
response := ErrorResponseFromError(err)
resj, merr := json.Marshal(response)
if merr != nil {
w.WriteHeader(http.StatusInternalServerError)
return true
}
switch {
case util.IsErrBadRequest(err):
w.WriteHeader(http.StatusBadRequest)
w.Write(resj)
case util.IsErrNotFound(err):
w.WriteHeader(http.StatusNotFound)
w.Write(resj)
default:
w.WriteHeader(http.StatusInternalServerError)
w.Write(resj)
}
return true
}
func httpResponse(w http.ResponseWriter, code int, res interface{}) error {
w.Header().Set("Content-Type", "application/json")
if res != nil {
resj, err := json.Marshal(res)
if err != nil {
httpError(w, err)
return err
}
w.WriteHeader(code)
_, err = w.Write(resj)
return err
}
w.WriteHeader(code)
return nil
}
type LogsHandler struct {
log *zap.SugaredLogger
e *etcd.Store
ost *objectstorage.ObjStorage
dm *datamanager.DataManager
}
func NewLogsHandler(logger *zap.Logger, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *LogsHandler {
return &LogsHandler{
log: logger.Sugar(),
e: e,
ost: ost,
dm: dm,
}
}
func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// TODO(sgotti) Check authorized call from client
q := r.URL.Query()
runID := q.Get("runid")
if runID == "" {
http.Error(w, "", http.StatusBadRequest)
return
}
taskID := q.Get("taskid")
if taskID == "" {
http.Error(w, "", http.StatusBadRequest)
return
}
_, setup := q["setup"]
stepStr := q.Get("step")
if !setup && stepStr == "" {
http.Error(w, "", http.StatusBadRequest)
return
}
if setup && stepStr != "" {
http.Error(w, "", http.StatusBadRequest)
return
}
var step int
if stepStr != "" {
var err error
step, err = strconv.Atoi(stepStr)
if err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
}
follow := false
if _, ok := q["follow"]; ok {
follow = true
}
stream := false
if _, ok := q["stream"]; ok {
stream = true
}
if follow {
stream = true
}
if err, sendError := h.readTaskLogs(ctx, runID, taskID, setup, step, w, follow, stream); err != nil {
h.log.Errorf("err: %+v", err)
if sendError {
switch err.(type) {
case common.ErrNotExist:
http.Error(w, err.Error(), http.StatusNotFound)
default:
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
}
func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow, stream bool) (error, bool) {
r, err := store.GetRunEtcdOrOST(ctx, h.e, h.dm, runID)
if err != nil {
return err, true
}
if r == nil {
return errors.Errorf("no such run with id: %s", runID), true
}
task, ok := r.Tasks[taskID]
if !ok {
return errors.Errorf("no such task with ID %s in run %s", taskID, runID), true
}
if len(task.Steps) <= step {
return errors.Errorf("no such step for task %s in run %s", taskID, runID), true
}
// if the log has been already fetched use it, otherwise fetch it from the executor
if task.Steps[step].LogPhase == types.RunTaskFetchPhaseFinished {
var logPath string
if setup {
logPath = store.OSTRunTaskSetupLogPath(task.ID)
} else {
logPath = store.OSTRunTaskStepLogPath(task.ID, step)
}
f, err := h.ost.ReadObject(logPath)
if err != nil {
if err == objectstorage.ErrNotExist {
return common.NewErrNotExist(err), true
}
return err, true
}
defer f.Close()
return sendLogs(w, f, stream), false
}
et, err := store.GetExecutorTask(ctx, h.e, task.ID)
if err != nil {
return err, true
}
executor, err := store.GetExecutor(ctx, h.e, et.Status.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound {
return err, true
}
if executor == nil {
return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Status.ExecutorID)), true
}
var url string
if setup {
url = fmt.Sprintf("%s/api/v1alpha/executor/logs?taskid=%s&setup", executor.ListenURL, taskID)
} else {
url = fmt.Sprintf("%s/api/v1alpha/executor/logs?taskid=%s&step=%d", executor.ListenURL, taskID, step)
}
if follow {
url += "&follow"
}
req, err := http.Get(url)
if err != nil {
return err, true
}
defer req.Body.Close()
if req.StatusCode != http.StatusOK {
if req.StatusCode == http.StatusNotFound {
return common.NewErrNotExist(errors.New("no log on executor")), true
}
return errors.Errorf("received http status: %d", req.StatusCode), true
}
return sendLogs(w, req.Body, stream), false
}
func sendLogs(w http.ResponseWriter, r io.Reader, stream bool) error {
if stream {
w.Header().Set("Content-Type", "text/event-stream")
}
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
br := bufio.NewReader(r)
var flusher http.Flusher
if fl, ok := w.(http.Flusher); ok {
flusher = fl
}
stop := false
for {
if stop {
return nil
}
data, err := br.ReadBytes('\n')
if err != nil {
if err != io.EOF {
return err
}
if len(data) == 0 {
return nil
}
stop = true
}
if stream {
if _, err := w.Write([]byte(fmt.Sprintf("data: %s\n", data))); err != nil {
return err
}
} else {
if _, err := w.Write(data); err != nil {
return err
}
}
if flusher != nil {
flusher.Flush()
}
}
}
type ChangeGroupsUpdateTokensHandler struct {
log *zap.SugaredLogger
readDB *readdb.ReadDB
}
func NewChangeGroupsUpdateTokensHandler(logger *zap.Logger, readDB *readdb.ReadDB) *ChangeGroupsUpdateTokensHandler {
return &ChangeGroupsUpdateTokensHandler{
log: logger.Sugar(),
readDB: readDB,
}
}
func (h *ChangeGroupsUpdateTokensHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
groups := query["changegroup"]
var cgt *types.ChangeGroupsUpdateToken
err := h.readDB.Do(func(tx *db.Tx) error {
var err error
cgt, err = h.readDB.GetChangeGroupsUpdateTokens(tx, groups)
return err
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
cgts, err := types.MarshalChangeGroupsUpdateToken(cgt)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := httpResponse(w, http.StatusOK, cgts); err != nil {
h.log.Errorf("err: %+v", err)
}
}
type RunResponse struct {
Run *types.Run `json:"run"`
RunConfig *types.RunConfig `json:"run_config"`
ChangeGroupsUpdateToken string `json:"change_groups_update_tokens"`
}
type RunHandler struct {
log *zap.SugaredLogger
e *etcd.Store
dm *datamanager.DataManager
readDB *readdb.ReadDB
}
func NewRunHandler(logger *zap.Logger, e *etcd.Store, dm *datamanager.DataManager, readDB *readdb.ReadDB) *RunHandler {
return &RunHandler{
log: logger.Sugar(),
e: e,
dm: dm,
readDB: readDB,
}
}
func (h *RunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
runID := vars["runid"]
query := r.URL.Query()
changeGroups := query["changegroup"]
var run *types.Run
var cgt *types.ChangeGroupsUpdateToken
err := h.readDB.Do(func(tx *db.Tx) error {
var err error
run, err = h.readDB.GetRun(tx, runID)
if err != nil {
h.log.Errorf("err: %+v", err)
return err
}
cgt, err = h.readDB.GetChangeGroupsUpdateTokens(tx, changeGroups)
return err
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
cgts, err := types.MarshalChangeGroupsUpdateToken(cgt)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rc, err := store.OSTGetRunConfig(h.dm, run.ID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
res := &RunResponse{
Run: run,
RunConfig: rc,
ChangeGroupsUpdateToken: cgts,
}
if err := httpResponse(w, http.StatusOK, res); err != nil {
h.log.Errorf("err: %+v", err)
}
}
const (
DefaultRunsLimit = 25
MaxRunsLimit = 40
)
type GetRunsResponse struct {
Runs []*types.Run `json:"runs"`
ChangeGroupsUpdateToken string `json:"change_groups_update_tokens"`
}
type RunsHandler struct {
log *zap.SugaredLogger
readDB *readdb.ReadDB
}
func NewRunsHandler(logger *zap.Logger, readDB *readdb.ReadDB) *RunsHandler {
return &RunsHandler{
log: logger.Sugar(),
readDB: readDB,
}
}
func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
phaseFilter := types.RunPhaseFromStringSlice(query["phase"])
changeGroups := query["changegroup"]
groups := query["group"]
_, lastRun := query["lastrun"]
limitS := query.Get("limit")
limit := DefaultRunsLimit
if limitS != "" {
var err error
limit, err = strconv.Atoi(limitS)
if err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
}
if limit < 0 {
http.Error(w, "limit must be greater or equal than 0", http.StatusBadRequest)
return
}
if limit > MaxRunsLimit {
limit = MaxRunsLimit
}
sortOrder := types.SortOrderDesc
if _, ok := query["asc"]; ok {
sortOrder = types.SortOrderAsc
}
start := query.Get("start")
var runs []*types.Run
var cgt *types.ChangeGroupsUpdateToken
err := h.readDB.Do(func(tx *db.Tx) error {
var err error
runs, err = h.readDB.GetRuns(tx, groups, lastRun, phaseFilter, start, limit, sortOrder)
if err != nil {
h.log.Errorf("err: %+v", err)
return err
}
cgt, err = h.readDB.GetChangeGroupsUpdateTokens(tx, changeGroups)
return err
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
cgts, err := types.MarshalChangeGroupsUpdateToken(cgt)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
res := &GetRunsResponse{
Runs: runs,
ChangeGroupsUpdateToken: cgts,
}
if err := httpResponse(w, http.StatusOK, res); err != nil {
h.log.Errorf("err: %+v", err)
}
}
type RunCreateRequest struct {
// new run fields
RunConfigTasks map[string]*types.RunConfigTask `json:"run_config_tasks"`
Name string `json:"name"`
Group string `json:"group"`
SetupErrors []string `json:"setup_errors"`
StaticEnvironment map[string]string `json:"static_environment"`
// existing run fields
RunID string `json:"run_id"`
FromStart bool `json:"from_start"`
ResetTasks []string `json:"reset_tasks"`
// common fields
Environment map[string]string `json:"environment"`
Annotations map[string]string `json:"annotations"`
ChangeGroupsUpdateToken string `json:"changeup_update_tokens"`
}
type RunCreateHandler struct {
log *zap.SugaredLogger
ah *action.ActionHandler
}
func NewRunCreateHandler(logger *zap.Logger, ah *action.ActionHandler) *RunCreateHandler {
return &RunCreateHandler{
log: logger.Sugar(),
ah: ah,
}
}
func (h *RunCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req RunCreateRequest
d := json.NewDecoder(r.Body)
if err := d.Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
creq := &action.RunCreateRequest{
RunConfigTasks: req.RunConfigTasks,
Name: req.Name,
Group: req.Group,
SetupErrors: req.SetupErrors,
StaticEnvironment: req.StaticEnvironment,
RunID: req.RunID,
FromStart: req.FromStart,
ResetTasks: req.ResetTasks,
Environment: req.Environment,
Annotations: req.Annotations,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
rb, err := h.ah.CreateRun(ctx, creq)
if err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
return
}
res := &RunResponse{
Run: rb.Run,
RunConfig: rb.Rc,
}
if err := httpResponse(w, http.StatusCreated, res); err != nil {
h.log.Errorf("err: %+v", err)
}
}
type RunActionType string
const (
RunActionTypeChangePhase RunActionType = "changephase"
RunActionTypeStop RunActionType = "stop"
)
type RunActionsRequest struct {
ActionType RunActionType `json:"action_type"`
Phase types.RunPhase `json:"phase"`
ChangeGroupsUpdateToken string `json:"change_groups_update_tokens"`
}
type RunActionsHandler struct {
log *zap.SugaredLogger
ah *action.ActionHandler
readDB *readdb.ReadDB
}
func NewRunActionsHandler(logger *zap.Logger, ah *action.ActionHandler) *RunActionsHandler {
return &RunActionsHandler{
log: logger.Sugar(),
ah: ah,
}
}
func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
runID := vars["runid"]
var req RunActionsRequest
d := json.NewDecoder(r.Body)
if err := d.Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
switch req.ActionType {
case RunActionTypeChangePhase:
creq := &action.RunChangePhaseRequest{
RunID: runID,
Phase: req.Phase,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
if err := h.ah.ChangeRunPhase(ctx, creq); err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
return
}
case RunActionTypeStop:
creq := &action.RunStopRequest{
RunID: runID,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
if err := h.ah.StopRun(ctx, creq); err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
return
}
default:
http.Error(w, "", http.StatusBadRequest)
return
}
}
type RunTaskActionType string
const (
RunTaskActionTypeSetAnnotations RunTaskActionType = "setannotations"
RunTaskActionTypeApprove RunTaskActionType = "approve"
)
type RunTaskActionsRequest struct {
ActionType RunTaskActionType `json:"action_type"`
// set Annotations fields
Annotations map[string]string `json:"annotations,omitempty"`
// global fields
ChangeGroupsUpdateToken string `json:"change_groups_update_tokens"`
}
type RunTaskActionsHandler struct {
log *zap.SugaredLogger
ah *action.ActionHandler
readDB *readdb.ReadDB
}
func NewRunTaskActionsHandler(logger *zap.Logger, ah *action.ActionHandler) *RunTaskActionsHandler {
return &RunTaskActionsHandler{
log: logger.Sugar(),
ah: ah,
}
}
func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
runID := vars["runid"]
taskID := vars["taskid"]
var req RunTaskActionsRequest
d := json.NewDecoder(r.Body)
if err := d.Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
switch req.ActionType {
case RunTaskActionTypeSetAnnotations:
creq := &action.RunTaskSetAnnotationsRequest{
RunID: runID,
TaskID: taskID,
Annotations: req.Annotations,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
if err := h.ah.RunTaskSetAnnotations(ctx, creq); err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
return
}
case RunTaskActionTypeApprove:
creq := &action.RunTaskApproveRequest{
RunID: runID,
TaskID: taskID,
ChangeGroupsUpdateToken: req.ChangeGroupsUpdateToken,
}
if err := h.ah.ApproveRunTask(ctx, creq); err != nil {
h.log.Errorf("err: %+v", err)
httpError(w, err)
return
}
default:
http.Error(w, "", http.StatusBadRequest)
return
}
}