agola/internal/services/gateway/api/run.go
Simone Gotti 8bde2f2bc0 runservice: implement caching
Add `save_cache` and `restore_cache steps`
2019-04-13 14:58:56 +02:00

596 lines
15 KiB
Go

// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"bufio"
"encoding/json"
"io"
"net/http"
"strconv"
"time"
"github.com/pkg/errors"
rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api"
rstypes "github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
"go.uber.org/zap"
"github.com/gorilla/mux"
)
type RunsResponse struct {
ID string `json:"id"`
Counter uint64 `json:"counter"`
Name string `json:"name"`
Annotations map[string]string `json:"annotations"`
Phase rstypes.RunPhase `json:"phase"`
Result rstypes.RunResult `json:"result"`
TasksWaitingApproval []string `json:"tasks_waiting_approval"`
EnqueueTime *time.Time `json:"enqueue_time"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
}
type RunResponse struct {
ID string `json:"id"`
Counter uint64 `json:"counter"`
Name string `json:"name"`
Annotations map[string]string `json:"annotations"`
Phase rstypes.RunPhase `json:"phase"`
Result rstypes.RunResult `json:"result"`
SetupErrors []string `json:"setup_errors"`
Tasks map[string]*RunResponseTask `json:"tasks"`
TasksWaitingApproval []string `json:"tasks_waiting_approval"`
EnqueueTime *time.Time `json:"enqueue_time"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
CanRestartFromScratch bool `json:"can_restart_from_scratch"`
CanRestartFromFailedTasks bool `json:"can_restart_from_failed_tasks"`
}
type RunResponseTask struct {
ID string `json:"id"`
Name string `json:"name"`
Status rstypes.RunTaskStatus `json:"status"`
Level int `json:"level"`
Depends map[string]*rstypes.RunConfigTaskDepend `json:"depends"`
WaitingApproval bool `json:"waiting_approval"`
Approved bool `json:"approved"`
ApprovalAnnotations map[string]string `json:"approval_annotations"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
}
type RunTaskResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Status rstypes.RunTaskStatus `json:"status"`
WaitingApproval bool `json:"waiting_approval"`
Approved bool `json:"approved"`
ApprovalAnnotations map[string]string `json:"approval_annotations"`
SetupStep *RunTaskResponseSetupStep `json:"setup_step"`
Steps []*RunTaskResponseStep `json:"steps"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
}
type RunTaskResponseSetupStep struct {
Phase rstypes.ExecutorTaskPhase `json:"phase"`
Name string `json:"name"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
}
type RunTaskResponseStep struct {
Phase rstypes.ExecutorTaskPhase `json:"phase"`
Name string `json:"name"`
Command string `json:"command"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
}
func createRunResponse(r *rstypes.Run, rc *rstypes.RunConfig) *RunResponse {
run := &RunResponse{
ID: r.ID,
Counter: r.Counter,
Name: r.Name,
Annotations: r.Annotations,
Phase: r.Phase,
Result: r.Result,
SetupErrors: rc.SetupErrors,
Tasks: make(map[string]*RunResponseTask),
TasksWaitingApproval: r.TasksWaitingApproval(),
EnqueueTime: r.EnqueueTime,
StartTime: r.StartTime,
EndTime: r.EndTime,
}
run.CanRestartFromScratch, _ = r.CanRestartFromScratch()
run.CanRestartFromFailedTasks, _ = r.CanRestartFromFailedTasks()
for name, rt := range r.Tasks {
rct := rc.Tasks[rt.ID]
run.Tasks[name] = createRunResponseTask(r, rt, rct)
}
return run
}
func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *RunResponseTask {
t := &RunResponseTask{
ID: rt.ID,
Name: rct.Name,
Status: rt.Status,
StartTime: rt.StartTime,
EndTime: rt.EndTime,
WaitingApproval: rt.WaitingApproval,
Approved: rt.Approved,
ApprovalAnnotations: rt.ApprovalAnnotations,
Level: rct.Level,
Depends: rct.Depends,
}
return t
}
func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *RunTaskResponse {
t := &RunTaskResponse{
ID: rt.ID,
Name: rct.Name,
Status: rt.Status,
WaitingApproval: rt.WaitingApproval,
Approved: rt.Approved,
ApprovalAnnotations: rt.ApprovalAnnotations,
Steps: make([]*RunTaskResponseStep, len(rt.Steps)),
StartTime: rt.StartTime,
EndTime: rt.EndTime,
}
t.SetupStep = &RunTaskResponseSetupStep{
Name: "Task setup",
Phase: rt.SetupStep.Phase,
StartTime: rt.SetupStep.StartTime,
EndTime: rt.SetupStep.EndTime,
}
for i := 0; i < len(t.Steps); i++ {
s := &RunTaskResponseStep{
Phase: rt.Steps[i].Phase,
StartTime: rt.Steps[i].StartTime,
EndTime: rt.Steps[i].EndTime,
}
rcts := rct.Steps[i]
switch rcts := rcts.(type) {
case *rstypes.RunStep:
s.Name = rcts.Name
s.Command = rcts.Command
case *rstypes.SaveToWorkspaceStep:
s.Name = "save to workspace"
case *rstypes.RestoreWorkspaceStep:
s.Name = "restore workspace"
case *rstypes.SaveCacheStep:
s.Name = "save cache"
case *rstypes.RestoreCacheStep:
s.Name = "restore cache"
}
t.Steps[i] = s
}
return t
}
type RunHandler struct {
log *zap.SugaredLogger
runserviceClient *rsapi.Client
}
func NewRunHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *RunHandler {
return &RunHandler{log: logger.Sugar(), runserviceClient: runserviceClient}
}
func (h *RunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
runID := vars["runid"]
runResp, resp, err := h.runserviceClient.GetRun(ctx, runID)
if httpErrorFromRemote(w, resp, err) {
h.log.Errorf("err: %+v", err)
return
}
res := createRunResponse(runResp.Run, runResp.RunConfig)
if err := httpResponse(w, http.StatusOK, res); err != nil {
h.log.Errorf("err: %+v", err)
}
}
type RuntaskHandler struct {
log *zap.SugaredLogger
runserviceClient *rsapi.Client
}
func NewRuntaskHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *RuntaskHandler {
return &RuntaskHandler{log: logger.Sugar(), runserviceClient: runserviceClient}
}
func (h *RuntaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
runID := vars["runid"]
taskID := vars["taskid"]
runResp, resp, err := h.runserviceClient.GetRun(ctx, runID)
if httpErrorFromRemote(w, resp, err) {
h.log.Errorf("err: %+v", err)
return
}
run := runResp.Run
rc := runResp.RunConfig
rt, ok := run.Tasks[taskID]
if !ok {
httpError(w, util.NewErrNotFound(errors.Errorf("run %q task %q not found", runID, taskID)))
return
}
rct := rc.Tasks[rt.ID]
res := createRunTaskResponse(rt, rct)
if err := httpResponse(w, http.StatusOK, res); err != nil {
h.log.Errorf("err: %+v", err)
}
}
const (
DefaultRunsLimit = 25
MaxRunsLimit = 40
)
func createRunsResponse(r *rstypes.Run) *RunsResponse {
run := &RunsResponse{
ID: r.ID,
Counter: r.Counter,
Name: r.Name,
Annotations: r.Annotations,
Phase: r.Phase,
Result: r.Result,
TasksWaitingApproval: r.TasksWaitingApproval(),
EnqueueTime: r.EnqueueTime,
StartTime: r.StartTime,
EndTime: r.EndTime,
}
return run
}
type RunsHandler struct {
log *zap.SugaredLogger
runserviceClient *rsapi.Client
}
func NewRunsHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *RunsHandler {
return &RunsHandler{log: logger.Sugar(), runserviceClient: runserviceClient}
}
func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
q := r.URL.Query()
groups := q["group"]
// we require that groups are specified to not return all runs
if len(groups) == 0 {
httpError(w, util.NewErrBadRequest(errors.Errorf("no groups specified")))
return
}
phaseFilter := q["phase"]
changeGroups := q["changegroup"]
_, lastRun := q["lastrun"]
limitS := q.Get("limit")
limit := DefaultRunsLimit
if limitS != "" {
var err error
limit, err = strconv.Atoi(limitS)
if err != nil {
httpError(w, util.NewErrBadRequest(errors.Wrapf(err, "cannot parse limit")))
return
}
}
if limit < 0 {
httpError(w, util.NewErrBadRequest(errors.Errorf("limit must be greater or equal than 0")))
return
}
if limit > MaxRunsLimit {
limit = MaxRunsLimit
}
asc := false
if _, ok := q["asc"]; ok {
asc = true
}
start := q.Get("start")
runsResp, resp, err := h.runserviceClient.GetRuns(ctx, phaseFilter, groups, lastRun, changeGroups, start, limit, asc)
if httpErrorFromRemote(w, resp, err) {
h.log.Errorf("err: %+v", err)
return
}
runs := make([]*RunsResponse, len(runsResp.Runs))
for i, r := range runsResp.Runs {
runs[i] = createRunsResponse(r)
}
if err := httpResponse(w, http.StatusOK, runs); err != nil {
h.log.Errorf("err: %+v", err)
}
}
type RunActionType string
const (
RunActionTypeRestart RunActionType = "restart"
RunActionTypeStop RunActionType = "stop"
)
type RunActionsRequest struct {
ActionType RunActionType `json:"action_type"`
// Restart
FromStart bool `json:"from_start"`
}
type RunActionsHandler struct {
log *zap.SugaredLogger
runserviceClient *rsapi.Client
}
func NewRunActionsHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *RunActionsHandler {
return &RunActionsHandler{log: logger.Sugar(), runserviceClient: runserviceClient}
}
func (h *RunActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
runID := vars["runid"]
var req RunActionsRequest
d := json.NewDecoder(r.Body)
if err := d.Decode(&req); err != nil {
httpError(w, util.NewErrBadRequest(err))
return
}
switch req.ActionType {
case RunActionTypeRestart:
rsreq := &rsapi.RunCreateRequest{
RunID: runID,
FromStart: req.FromStart,
}
resp, err := h.runserviceClient.CreateRun(ctx, rsreq)
if httpErrorFromRemote(w, resp, err) {
h.log.Errorf("err: %+v", err)
return
}
case RunActionTypeStop:
rsreq := &rsapi.RunActionsRequest{
ActionType: rsapi.RunActionTypeStop,
}
resp, err := h.runserviceClient.RunActions(ctx, runID, rsreq)
if httpErrorFromRemote(w, resp, err) {
h.log.Errorf("err: %+v", err)
return
}
}
}
type RunTaskActionType string
const (
RunTaskActionTypeApprove RunTaskActionType = "approve"
)
type RunTaskActionsRequest struct {
ActionType RunTaskActionType `json:"action_type"`
ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"`
}
type RunTaskActionsHandler struct {
log *zap.SugaredLogger
runserviceClient *rsapi.Client
}
func NewRunTaskActionsHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *RunTaskActionsHandler {
return &RunTaskActionsHandler{log: logger.Sugar(), runserviceClient: runserviceClient}
}
func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
runID := vars["runid"]
taskID := vars["taskid"]
var req RunTaskActionsRequest
d := json.NewDecoder(r.Body)
if err := d.Decode(&req); err != nil {
httpError(w, util.NewErrBadRequest(err))
return
}
switch req.ActionType {
case RunTaskActionTypeApprove:
rsreq := &rsapi.RunTaskActionsRequest{
ActionType: rsapi.RunTaskActionTypeApprove,
ApprovalAnnotations: req.ApprovalAnnotations,
}
resp, err := h.runserviceClient.RunTaskActions(ctx, runID, taskID, rsreq)
if httpErrorFromRemote(w, resp, err) {
h.log.Errorf("err: %+v", err)
return
}
default:
httpError(w, util.NewErrBadRequest(errors.Errorf("wrong action type %q", req.ActionType)))
return
}
}
type LogsHandler struct {
log *zap.SugaredLogger
runserviceClient *rsapi.Client
}
func NewLogsHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *LogsHandler {
return &LogsHandler{log: logger.Sugar(), runserviceClient: runserviceClient}
}
func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
q := r.URL.Query()
runID := q.Get("runID")
if runID == "" {
httpError(w, util.NewErrBadRequest(errors.Errorf("empty run id")))
return
}
taskID := q.Get("taskID")
if taskID == "" {
httpError(w, util.NewErrBadRequest(errors.Errorf("empty task id")))
return
}
_, setup := q["setup"]
stepStr := q.Get("step")
if !setup && stepStr == "" {
httpError(w, util.NewErrBadRequest(errors.Errorf("no setup or step number provided")))
return
}
if setup && stepStr != "" {
httpError(w, util.NewErrBadRequest(errors.Errorf("both setup and step number provided")))
return
}
var step int
if stepStr != "" {
var err error
step, err = strconv.Atoi(stepStr)
if err != nil {
httpError(w, util.NewErrBadRequest(errors.Wrapf(err, "cannot parse step number")))
return
}
}
follow := false
if _, ok := q["follow"]; ok {
follow = true
}
stream := false
if _, ok := q["stream"]; ok {
stream = true
}
if follow {
stream = true
}
resp, err := h.runserviceClient.GetLogs(ctx, runID, taskID, setup, step, follow, stream)
if httpErrorFromRemote(w, resp, err) {
h.log.Errorf("err: %+v", err)
return
}
if stream {
w.Header().Set("Content-Type", "text/event-stream")
}
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
defer resp.Body.Close()
if stream {
if err := sendLogs(w, resp.Body); err != nil {
h.log.Errorf("err: %+v", err)
return
}
} else {
if _, err := io.Copy(w, resp.Body); err != nil {
h.log.Errorf("err: %+v", err)
return
}
}
}
// sendLogs is used during streaming to flush logs lines
// TODO(sgotti) there's no need to do br.ReadBytes since the response is
// already flushed by the runservice.
func sendLogs(w io.Writer, r io.Reader) error {
br := bufio.NewReader(r)
var flusher http.Flusher
if fl, ok := w.(http.Flusher); ok {
flusher = fl
}
stop := false
for {
if stop {
return nil
}
data, err := br.ReadBytes('\n')
if err != nil {
if err != io.EOF {
return err
}
if len(data) == 0 {
return nil
}
stop = true
}
if _, err := w.Write(data); err != nil {
return err
}
if flusher != nil {
flusher.Flush()
}
}
}