runservice: implement caching

Add `save_cache` and `restore_cache steps`
This commit is contained in:
Simone Gotti 2019-04-13 14:58:56 +02:00
parent 57161477ca
commit 8bde2f2bc0
12 changed files with 639 additions and 54 deletions

View File

@ -147,7 +147,7 @@ type Value struct {
Value string Value string
} }
type SaveToWorkspaceContent struct { type SaveContent struct {
SourceDir string `yaml:"source_dir"` SourceDir string `yaml:"source_dir"`
DestDir string `yaml:"dest_dir"` DestDir string `yaml:"dest_dir"`
Paths []string `yaml:"paths"` Paths []string `yaml:"paths"`
@ -155,7 +155,7 @@ type SaveToWorkspaceContent struct {
type SaveToWorkspaceStep struct { type SaveToWorkspaceStep struct {
Step `yaml:",inline"` Step `yaml:",inline"`
Contents []SaveToWorkspaceContent Contents []*SaveContent `yaml:"contents"`
} }
type RestoreWorkspaceStep struct { type RestoreWorkspaceStep struct {
@ -163,6 +163,18 @@ type RestoreWorkspaceStep struct {
DestDir string `yaml:"dest_dir"` DestDir string `yaml:"dest_dir"`
} }
type SaveCacheStep struct {
Step `yaml:",inline"`
Key string `yaml:"key"`
Contents []*SaveContent `yaml:"contents"`
}
type RestoreCacheStep struct {
Step `yaml:",inline"`
Keys []string `yaml:"keys"`
DestDir string `yaml:"dest_dir"`
}
func (t *Task) UnmarshalYAML(unmarshal func(interface{}) error) error { func (t *Task) UnmarshalYAML(unmarshal func(interface{}) error) error {
type task struct { type task struct {
Name string `yaml:"name"` Name string `yaml:"name"`
@ -198,38 +210,54 @@ func (t *Task) UnmarshalYAML(unmarshal func(interface{}) error) error {
} }
switch stepType { switch stepType {
case "clone": case "clone":
var cs CloneStep var s CloneStep
cs.Type = stepType s.Type = stepType
steps[i] = &cs steps[i] = &s
case "run": case "run":
var rs RunStep var s RunStep
switch stepSpec.(type) { switch stepSpec.(type) {
case string: case string:
rs.Command = stepSpec.(string) s.Command = stepSpec.(string)
default: default:
if err := yaml.Unmarshal(o, &rs); err != nil { if err := yaml.Unmarshal(o, &s); err != nil {
return err return err
} }
} }
rs.Type = stepType s.Type = stepType
steps[i] = &rs steps[i] = &s
case "save_to_workspace": case "save_to_workspace":
var sws SaveToWorkspaceStep var s SaveToWorkspaceStep
if err := yaml.Unmarshal(o, &sws); err != nil { if err := yaml.Unmarshal(o, &s); err != nil {
return err return err
} }
sws.Type = stepType s.Type = stepType
steps[i] = &sws steps[i] = &s
case "restore_workspace": case "restore_workspace":
var rws RestoreWorkspaceStep var s RestoreWorkspaceStep
if err := yaml.Unmarshal(o, &rws); err != nil { if err := yaml.Unmarshal(o, &s); err != nil {
return err return err
} }
rws.Type = stepType s.Type = stepType
steps[i] = &rws steps[i] = &s
case "save_cache":
var s SaveCacheStep
if err := yaml.Unmarshal(o, &s); err != nil {
return err
}
s.Type = stepType
steps[i] = &s
case "restore_cache":
var s RestoreCacheStep
if err := yaml.Unmarshal(o, &s); err != nil {
return err
}
s.Type = stepType
steps[i] = &s
default: default:
return errors.Errorf("unknown step type: %s", stepType) return errors.Errorf("unknown step type: %s", stepType)
} }
@ -572,6 +600,25 @@ func ParseConfig(configData []byte) (*Config, error) {
} }
} }
// set steps defaults
for _, t := range config.Tasks {
for _, s := range t.Steps {
switch step := s.(type) {
// TODO(sgotti) we could use the run step command as step name but when the
// command is very long or multi line it doesn't makes sense and will
// probably be quite unuseful/confusing from an UI point of view
case *SaveCacheStep:
for _, content := range step.Contents {
if len(content.Paths) == 0 {
// default to all files inside the sourceDir
content.Paths = []string{"**"}
}
}
log.Infof("s: %s", util.Dump(s))
}
}
}
return &config, checkConfig(&config) return &config, checkConfig(&config)
} }

View File

@ -167,6 +167,10 @@ func TestParseOutput(t *testing.T) {
ENV01: ENV01 ENV01: ENV01
ENVFROMVARIABLE01: ENVFROMVARIABLE01:
from_variable: variable01 from_variable: variable01
- save_cache:
key: cache-{{ arch }}
contents:
- source_dir: /go/pkg/mod/cache
pipelines: pipelines:
pipeline01: pipeline01:
@ -247,6 +251,11 @@ func TestParseOutput(t *testing.T) {
"ENVFROMVARIABLE01": Value{Type: ValueTypeFromVariable, Value: "variable01"}, "ENVFROMVARIABLE01": Value{Type: ValueTypeFromVariable, Value: "variable01"},
}, },
}, },
&SaveCacheStep{
Step: Step{Type: "save_cache"},
Key: "cache-{{ arch }}",
Contents: []*SaveContent{&SaveContent{SourceDir: "/go/pkg/mod/cache", Paths: []string{"**"}}},
},
}, },
}, },
}, },

View File

@ -131,9 +131,9 @@ fi
sws.Type = cs.Type sws.Type = cs.Type
sws.Name = cs.Name sws.Name = cs.Name
sws.Contents = make([]rstypes.SaveToWorkspaceContent, len(cs.Contents)) sws.Contents = make([]rstypes.SaveContent, len(cs.Contents))
for i, csc := range cs.Contents { for i, csc := range cs.Contents {
sc := rstypes.SaveToWorkspaceContent{} sc := rstypes.SaveContent{}
sc.SourceDir = csc.SourceDir sc.SourceDir = csc.SourceDir
sc.DestDir = csc.DestDir sc.DestDir = csc.DestDir
sc.Paths = csc.Paths sc.Paths = csc.Paths
@ -150,6 +150,33 @@ fi
return rws return rws
case *config.SaveCacheStep:
sws := &rstypes.SaveCacheStep{}
sws.Type = cs.Type
sws.Name = cs.Name
sws.Key = cs.Key
sws.Contents = make([]rstypes.SaveContent, len(cs.Contents))
for i, csc := range cs.Contents {
sc := rstypes.SaveContent{}
sc.SourceDir = csc.SourceDir
sc.DestDir = csc.DestDir
sc.Paths = csc.Paths
sws.Contents[i] = sc
}
return sws
case *config.RestoreCacheStep:
rws := &rstypes.RestoreCacheStep{}
rws.Name = cs.Name
rws.Type = cs.Type
rws.Keys = cs.Keys
rws.DestDir = cs.DestDir
return rws
default: default:
panic(fmt.Errorf("unknown config step type: %s", util.Dump(cs))) panic(fmt.Errorf("unknown config step type: %s", util.Dump(cs)))
} }

View File

@ -200,6 +200,10 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *Run
s.Name = "save to workspace" s.Name = "save to workspace"
case *rstypes.RestoreWorkspaceStep: case *rstypes.RestoreWorkspaceStep:
s.Name = "restore workspace" s.Name = "restore workspace"
case *rstypes.SaveCacheStep:
s.Name = "save cache"
case *rstypes.RestoreCacheStep:
s.Name = "restore cache"
} }
t.Steps[i] = s t.Steps[i] = s

View File

@ -253,6 +253,42 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor
return exitCode, nil return exitCode, nil
} }
func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driver.Pod, logf io.Writer, key string) (string, error) {
cmd := append([]string{toolboxContainerPath, "template"})
// limit the template answer to max 1MiB
stdout := util.NewLimitedBuffer(1024 * 1024)
execConfig := &driver.ExecConfig{
Cmd: cmd,
Env: t.Environment,
WorkingDir: t.WorkingDir,
Stdout: stdout,
Stderr: logf,
}
ce, err := pod.Exec(ctx, execConfig)
if err != nil {
return "", err
}
stdin := ce.Stdin()
go func() {
io.WriteString(stdin, key)
stdin.Close()
}()
exitCode, err := ce.Wait(ctx)
if err != nil {
return "", err
}
if exitCode != 0 {
return "", errors.Errorf("template ended with exit code %d", exitCode)
}
return stdout.String(), nil
}
func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source io.Reader, pod driver.Pod, logf io.Writer, destDir string, overwrite, removeDestDir bool) error { func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source io.Reader, pod driver.Pod, logf io.Writer, destDir string, overwrite, removeDestDir bool) error {
args := []string{"--destdir", destDir} args := []string{"--destdir", destDir}
if overwrite { if overwrite {
@ -328,6 +364,176 @@ func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreW
return 0, nil return 0, nil
} }
func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, t *types.ExecutorTask, pod driver.Pod, logPath string, archivePath string) (int, error) {
cmd := []string{toolboxContainerPath, "archive"}
if err := os.MkdirAll(filepath.Dir(logPath), 0770); err != nil {
return -1, err
}
logf, err := os.Create(logPath)
if err != nil {
return -1, err
}
defer logf.Close()
save := false
// calculate key from template
userKey, err := e.template(ctx, t, pod, logf, s.Key)
if err != nil {
return -1, err
}
fmt.Fprintf(logf, "cache key %q\n", userKey)
// append cache prefix
key := t.CachePrefix + "-" + userKey
// check that the cache key doesn't already exists
resp, err := e.runserviceClient.CheckCache(ctx, key, false)
if err != nil {
// ignore 404 errors since they means that the cache key doesn't exists
if resp != nil && resp.StatusCode == http.StatusNotFound {
fmt.Fprintf(logf, "no cache available for key %q. Saving.\n", userKey)
save = true
} else {
// TODO(sgotti) retry before giving up
fmt.Fprintf(logf, "error checking for cache key %q: %v\n", userKey, err)
return -1, err
}
}
if !save {
fmt.Fprintf(logf, "cache for key %q already exists\n", userKey)
return 0, nil
}
fmt.Fprintf(logf, "archiving cache with key %q\n", userKey)
if err := os.MkdirAll(filepath.Dir(archivePath), 0770); err != nil {
return -1, err
}
archivef, err := os.Create(archivePath)
if err != nil {
return -1, err
}
defer archivef.Close()
execConfig := &driver.ExecConfig{
Cmd: cmd,
Env: t.Environment,
WorkingDir: t.WorkingDir,
Stdout: archivef,
Stderr: logf,
}
ce, err := pod.Exec(ctx, execConfig)
if err != nil {
return -1, err
}
type ArchiveInfo struct {
SourceDir string
DestDir string
Paths []string
}
type Archive struct {
ArchiveInfos []*ArchiveInfo
OutFile string
}
a := &Archive{
OutFile: "", // use stdout
ArchiveInfos: make([]*ArchiveInfo, len(s.Contents)),
}
for i, c := range s.Contents {
a.ArchiveInfos[i] = &ArchiveInfo{
SourceDir: c.SourceDir,
DestDir: c.DestDir,
Paths: c.Paths,
}
}
stdin := ce.Stdin()
enc := json.NewEncoder(stdin)
go func() {
enc.Encode(a)
stdin.Close()
}()
exitCode, err := ce.Wait(ctx)
if err != nil {
return -1, err
}
if exitCode != 0 {
return exitCode, errors.Errorf("save cache archiving command ended with exit code %d", exitCode)
}
f, err := os.Open(archivePath)
if err != nil {
return -1, err
}
// send cache archive to scheduler
if resp, err := e.runserviceClient.PutCache(ctx, key, f); err != nil {
if resp != nil && resp.StatusCode == http.StatusNotModified {
return exitCode, nil
}
return -1, err
}
return exitCode, nil
}
func (e *Executor) doRestoreCacheStep(ctx context.Context, s *types.RestoreCacheStep, t *types.ExecutorTask, pod driver.Pod, logPath string) (int, error) {
if err := os.MkdirAll(filepath.Dir(logPath), 0770); err != nil {
return -1, err
}
logf, err := os.Create(logPath)
if err != nil {
return -1, err
}
defer logf.Close()
fmt.Fprintf(logf, "restoring cache: %s\n", util.Dump(s))
for _, key := range s.Keys {
// calculate key from template
userKey, err := e.template(ctx, t, pod, logf, key)
if err != nil {
return -1, err
}
fmt.Fprintf(logf, "cache key %q\n", userKey)
// append cache prefix
key := t.CachePrefix + "-" + userKey
resp, err := e.runserviceClient.GetCache(ctx, key, true)
if err != nil {
// ignore 404 errors since they means that the cache key doesn't exists
if resp != nil && resp.StatusCode == http.StatusNotFound {
fmt.Fprintf(logf, "no cache available for key %q\n", userKey)
continue
}
// TODO(sgotti) retry before giving up
fmt.Fprintf(logf, "error reading cache: %v\n", err)
return -1, err
}
fmt.Fprintf(logf, "restoring cache with key %q\n", userKey)
cachef := resp.Body
if err := e.unarchive(ctx, t, cachef, pod, logf, s.DestDir, false, false); err != nil {
cachef.Close()
return -1, err
}
cachef.Close()
// stop here
break
}
return 0, nil
}
func (e *Executor) executorIDPath() string { func (e *Executor) executorIDPath() string {
return filepath.Join(e.c.DataDir, "id") return filepath.Join(e.c.DataDir, "id")
} }
@ -570,6 +776,17 @@ func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTa
stepName = s.Name stepName = s.Name
exitCode, err = e.doRestoreWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i)) exitCode, err = e.doRestoreWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i))
case *types.SaveCacheStep:
log.Debugf("save cache step: %s", util.Dump(s))
stepName = s.Name
archivePath := e.archivePath(et.ID, i)
exitCode, err = e.doSaveCacheStep(ctx, s, et, pod, e.stepLogPath(et.ID, i), archivePath)
case *types.RestoreCacheStep:
log.Debugf("restore cache step: %s", util.Dump(s))
stepName = s.Name
exitCode, err = e.doRestoreCacheStep(ctx, s, et, pod, e.stepLogPath(et.ID, i))
default: default:
return i, errors.Errorf("unknown step type: %s", util.Dump(s)) return i, errors.Errorf("unknown step type: %s", util.Dump(s))
} }

View File

@ -141,6 +141,26 @@ func (c *Client) GetArchive(ctx context.Context, taskID string, step int) (*http
return c.getResponse(ctx, "GET", "/executor/archives", q, nil, nil) return c.getResponse(ctx, "GET", "/executor/archives", q, nil, nil)
} }
func (c *Client) CheckCache(ctx context.Context, key string, prefix bool) (*http.Response, error) {
q := url.Values{}
if prefix {
q.Add("prefix", "")
}
return c.getResponse(ctx, "HEAD", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, nil, nil)
}
func (c *Client) GetCache(ctx context.Context, key string, prefix bool) (*http.Response, error) {
q := url.Values{}
if prefix {
q.Add("prefix", "")
}
return c.getResponse(ctx, "GET", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, nil, nil)
}
func (c *Client) PutCache(ctx context.Context, key string, r io.Reader) (*http.Response, error) {
return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, nil, r)
}
func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) { func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) {
q := url.Values{} q := url.Values{}
for _, phase := range phaseFilter { for _, phase := range phaseFilter {

View File

@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"net/http" "net/http"
"net/url"
"strconv" "strconv"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -168,7 +169,7 @@ func NewArchivesHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *Arch
} }
func (h *ArchivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *ArchivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO(sgotti) Check authorized call from scheduler // TODO(sgotti) Check authorized call from executors
taskID := r.URL.Query().Get("taskid") taskID := r.URL.Query().Get("taskid")
if taskID == "" { if taskID == "" {
@ -216,6 +217,165 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error
return err return err
} }
type CacheHandler struct {
log *zap.SugaredLogger
lts *objectstorage.ObjStorage
}
func NewCacheHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *CacheHandler {
return &CacheHandler{
log: logger.Sugar(),
lts: lts,
}
}
func (h *CacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
// TODO(sgotti) Check authorized call from executors
key, err := url.PathUnescape(vars["key"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if key == "" {
http.Error(w, "empty cache key", http.StatusBadRequest)
return
}
if len(key) > common.MaxCacheKeyLength {
http.Error(w, "cache key too long", http.StatusBadRequest)
return
}
query := r.URL.Query()
_, prefix := query["prefix"]
matchedKey, err := matchCache(h.lts, key, prefix)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if matchedKey == "" {
http.Error(w, "", http.StatusNotFound)
return
}
if r.Method == "HEAD" {
return
}
w.Header().Set("Cache-Control", "no-cache")
if err := h.readCache(matchedKey, w); err != nil {
switch err.(type) {
case common.ErrNotExist:
http.Error(w, err.Error(), http.StatusNotFound)
default:
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
}
func matchCache(lts *objectstorage.ObjStorage, key string, prefix bool) (string, error) {
cachePath := store.LTSCachePath(key)
if prefix {
doneCh := make(chan struct{})
defer close(doneCh)
// get the latest modified object
var lastObject *objectstorage.ObjectInfo
for object := range lts.List(store.LTSCacheDir()+"/"+key, "", false, doneCh) {
if object.Err != nil {
return "", object.Err
}
if (lastObject == nil) || (lastObject != nil && lastObject.LastModified.Before(object.LastModified)) {
lastObject = &object
}
}
if lastObject == nil {
return "", nil
}
return store.LTSCacheKey(lastObject.Path), nil
}
_, err := lts.Stat(cachePath)
if err == objectstorage.ErrNotExist {
return "", nil
}
if err != nil {
return "", err
}
return key, nil
}
func (h *CacheHandler) readCache(key string, w io.Writer) error {
cachePath := store.LTSCachePath(key)
f, err := h.lts.ReadObject(cachePath)
if err != nil {
if err == objectstorage.ErrNotExist {
return common.NewErrNotExist(err)
}
return err
}
defer f.Close()
br := bufio.NewReader(f)
_, err = io.Copy(w, br)
return err
}
type CacheCreateHandler struct {
log *zap.SugaredLogger
lts *objectstorage.ObjStorage
}
func NewCacheCreateHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *CacheCreateHandler {
return &CacheCreateHandler{
log: logger.Sugar(),
lts: lts,
}
}
func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
// TODO(sgotti) Check authorized call from executors
key, err := url.PathUnescape(vars["key"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if key == "" {
http.Error(w, "empty cache key", http.StatusBadRequest)
return
}
if len(key) > common.MaxCacheKeyLength {
http.Error(w, "cache key too long", http.StatusBadRequest)
return
}
w.Header().Set("Cache-Control", "no-cache")
matchedKey, err := matchCache(h.lts, key, false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if matchedKey != "" {
http.Error(w, "", http.StatusNotModified)
return
}
cachePath := store.LTSCachePath(key)
if err := h.lts.WriteObject(cachePath, r.Body); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
type ExecutorDeleteHandler struct { type ExecutorDeleteHandler struct {
log *zap.SugaredLogger log *zap.SugaredLogger
ch *command.CommandHandler ch *command.CommandHandler
@ -230,7 +390,6 @@ func NewExecutorDeleteHandler(logger *zap.Logger, ch *command.CommandHandler) *E
func (h *ExecutorDeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *ExecutorDeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
vars := mux.Vars(r) vars := mux.Vars(r)
// TODO(sgotti) Check authorized call from executors // TODO(sgotti) Check authorized call from executors

View File

@ -19,6 +19,10 @@ import (
"path" "path"
) )
const (
MaxCacheKeyLength = 200
)
type ErrNotExist struct { type ErrNotExist struct {
err error err error
} }

View File

@ -270,6 +270,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types
Shell: rct.Shell, Shell: rct.Shell,
User: rct.User, User: rct.User,
Steps: rct.Steps, Steps: rct.Steps,
CachePrefix: store.LTSRootGroup(r.Group),
Status: types.ExecutorTaskStatus{ Status: types.ExecutorTaskStatus{
Phase: types.ExecutorTaskPhaseNotStarted, Phase: types.ExecutorTaskPhaseNotStarted,
Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)), Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)),
@ -1515,6 +1516,8 @@ func (s *Scheduler) Run(ctx context.Context) error {
executorTaskHandler := api.NewExecutorTaskHandler(s.e) executorTaskHandler := api.NewExecutorTaskHandler(s.e)
executorTasksHandler := api.NewExecutorTasksHandler(s.e) executorTasksHandler := api.NewExecutorTasksHandler(s.e)
archivesHandler := api.NewArchivesHandler(logger, s.lts) archivesHandler := api.NewArchivesHandler(logger, s.lts)
cacheHandler := api.NewCacheHandler(logger, s.lts)
cacheCreateHandler := api.NewCacheCreateHandler(logger, s.lts)
// api from clients // api from clients
executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ch) executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ch)
@ -1530,6 +1533,8 @@ func (s *Scheduler) Run(ctx context.Context) error {
router := mux.NewRouter() router := mux.NewRouter()
apirouter := router.PathPrefix("/api/v1alpha").Subrouter() apirouter := router.PathPrefix("/api/v1alpha").Subrouter()
// don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method
apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST") apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST")
@ -1538,6 +1543,9 @@ func (s *Scheduler) Run(ctx context.Context) error {
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET") apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET")
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST") apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST")
apirouter.Handle("/executor/archives", archivesHandler).Methods("GET") apirouter.Handle("/executor/archives", archivesHandler).Methods("GET")
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD")
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET")
apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST")
apirouter.Handle("/logs", logsHandler).Methods("GET") apirouter.Handle("/logs", logsHandler).Methods("GET")

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"path" "path"
"reflect" "reflect"
"strings"
"github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/objectstorage"
@ -42,12 +43,12 @@ func LTSSubGroupsAndGroupTypes(group string) []string {
} }
func LTSRootGroup(group string) string { func LTSRootGroup(group string) string {
h := util.PathHierarchy(group) pl := util.PathList(group)
if len(h)%2 != 1 { if len(pl) < 2 {
panic(fmt.Errorf("wrong group path %q", group)) panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group))
} }
return h[2] return pl[1]
} }
func LTSSubGroups(group string) []string { func LTSSubGroups(group string) []string {
@ -131,6 +132,19 @@ func LTSRunArchivePath(rtID string, step int) string {
return path.Join("workspacearchives", fmt.Sprintf("%s/%d.tar", rtID, step)) return path.Join("workspacearchives", fmt.Sprintf("%s/%d.tar", rtID, step))
} }
func LTSCacheDir() string {
return "caches"
}
func LTSCachePath(key string) string {
return path.Join(LTSCacheDir(), fmt.Sprintf("%s.tar", key))
}
func LTSCacheKey(p string) string {
base := path.Base(p)
return strings.TrimSuffix(base, path.Ext(base))
}
func LTSGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, error) { func LTSGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, error) {
runConfigPath := common.StorageRunConfigFile(runConfigID) runConfigPath := common.StorageRunConfigFile(runConfigID)
rcf, _, err := wal.ReadObject(runConfigPath, nil) rcf, _, err := wal.ReadObject(runConfigPath, nil)

View File

@ -357,11 +357,11 @@ const (
) )
type RegistryAuth struct { type RegistryAuth struct {
Type RegistryAuthType `yaml:"type"` Type RegistryAuthType `json:"type,omitempty"`
// default auth // default auth
Username string `yaml:"username"` Username string `json:"username,omitempty"`
Password string `yaml:"password"` Password string `json:"password,omitempty"`
} }
type Runtime struct { type Runtime struct {
@ -387,30 +387,42 @@ func (rct *RunConfigTask) UnmarshalJSON(b []byte) error {
} }
steps := make([]interface{}, len(st.Steps)) steps := make([]interface{}, len(st.Steps))
for i, s := range st.Steps { for i, step := range st.Steps {
var bs Step var bs Step
if err := json.Unmarshal(s, &bs); err != nil { if err := json.Unmarshal(step, &bs); err != nil {
return err return err
} }
switch bs.Type { switch bs.Type {
case "run": case "run":
var rs RunStep var s RunStep
if err := json.Unmarshal(s, &rs); err != nil { if err := json.Unmarshal(step, &s); err != nil {
return err return err
} }
steps[i] = &rs steps[i] = &s
case "save_to_workspace": case "save_to_workspace":
var rs SaveToWorkspaceStep var s SaveToWorkspaceStep
if err := json.Unmarshal(s, &rs); err != nil { if err := json.Unmarshal(step, &s); err != nil {
return err return err
} }
steps[i] = &rs steps[i] = &s
case "restore_workspace": case "restore_workspace":
var rs RestoreWorkspaceStep var s RestoreWorkspaceStep
if err := json.Unmarshal(s, &rs); err != nil { if err := json.Unmarshal(step, &s); err != nil {
return err return err
} }
steps[i] = &rs steps[i] = &s
case "save_cache":
var s SaveCacheStep
if err := json.Unmarshal(step, &s); err != nil {
return err
}
steps[i] = &s
case "restore_cache":
var s RestoreCacheStep
if err := json.Unmarshal(step, &s); err != nil {
return err
}
steps[i] = &s
} }
} }
@ -433,7 +445,7 @@ type RunStep struct {
User string `json:"user,omitempty"` User string `json:"user,omitempty"`
} }
type SaveToWorkspaceContent struct { type SaveContent struct {
SourceDir string `json:"source_dir,omitempty"` SourceDir string `json:"source_dir,omitempty"`
DestDir string `json:"dest_dir,omitempty"` DestDir string `json:"dest_dir,omitempty"`
Paths []string `json:"paths,omitempty"` Paths []string `json:"paths,omitempty"`
@ -441,7 +453,7 @@ type SaveToWorkspaceContent struct {
type SaveToWorkspaceStep struct { type SaveToWorkspaceStep struct {
Step Step
Contents []SaveToWorkspaceContent `json:"contents,omitempty"` Contents []SaveContent `json:"contents,omitempty"`
} }
type RestoreWorkspaceStep struct { type RestoreWorkspaceStep struct {
@ -449,6 +461,18 @@ type RestoreWorkspaceStep struct {
DestDir string `json:"dest_dir,omitempty"` DestDir string `json:"dest_dir,omitempty"`
} }
type SaveCacheStep struct {
Step
Key string `json:"key,omitempty"`
Contents []SaveContent `json:"contents,omitempty"`
}
type RestoreCacheStep struct {
Step
Keys []string `json:"keys,omitempty"`
DestDir string `json:"dest_dir,omitempty"`
}
type ExecutorTaskPhase string type ExecutorTaskPhase string
const ( const (
@ -474,7 +498,7 @@ type ExecutorTask struct {
WorkingDir string `json:"working_dir,omitempty"` WorkingDir string `json:"working_dir,omitempty"`
Shell string `json:"shell,omitempty"` Shell string `json:"shell,omitempty"`
User string `json:"user,omitempty"` User string `json:"user,omitempty"`
Privileged bool `yaml:"privileged"` Privileged bool `json:"privileged"`
Steps []interface{} `json:"steps,omitempty"` Steps []interface{} `json:"steps,omitempty"`
@ -484,6 +508,10 @@ type ExecutorTask struct {
Workspace Workspace `json:"workspace,omitempty"` Workspace Workspace `json:"workspace,omitempty"`
// Cache prefix to use when asking for a cache key. To isolate caches between
// groups (projects)
CachePrefix string `json:"cache_prefix,omitempty"`
// Stop is used to signal from the scheduler when the task must be stopped // Stop is used to signal from the scheduler when the task must be stopped
Stop bool `json:"stop,omitempty"` Stop bool `json:"stop,omitempty"`
} }
@ -546,30 +574,42 @@ func (et *ExecutorTask) UnmarshalJSON(b []byte) error {
} }
steps := make([]interface{}, len(ett.Steps)) steps := make([]interface{}, len(ett.Steps))
for i, s := range st.Steps { for i, step := range st.Steps {
var bs Step var bs Step
if err := json.Unmarshal(s, &bs); err != nil { if err := json.Unmarshal(step, &bs); err != nil {
return err return err
} }
switch bs.Type { switch bs.Type {
case "run": case "run":
var rs RunStep var s RunStep
if err := json.Unmarshal(s, &rs); err != nil { if err := json.Unmarshal(step, &s); err != nil {
return err return err
} }
steps[i] = &rs steps[i] = &s
case "save_to_workspace": case "save_to_workspace":
var rs SaveToWorkspaceStep var s SaveToWorkspaceStep
if err := json.Unmarshal(s, &rs); err != nil { if err := json.Unmarshal(step, &s); err != nil {
return err return err
} }
steps[i] = &rs steps[i] = &s
case "restore_workspace": case "restore_workspace":
var rs RestoreWorkspaceStep var s RestoreWorkspaceStep
if err := json.Unmarshal(s, &rs); err != nil { if err := json.Unmarshal(step, &s); err != nil {
return err return err
} }
steps[i] = &rs steps[i] = &s
case "save_cache":
var s SaveCacheStep
if err := json.Unmarshal(step, &s); err != nil {
return err
}
steps[i] = &s
case "restore_cache":
var s RestoreCacheStep
if err := json.Unmarshal(step, &s); err != nil {
return err
}
steps[i] = &s
} }
} }

36
internal/util/buffer.go Normal file
View File

@ -0,0 +1,36 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"bytes"
"io"
)
type LimitedBuffer struct {
*bytes.Buffer
cap int
}
func (b *LimitedBuffer) Write(p []byte) (n int, err error) {
if len(p)+b.Len() > b.cap {
return 0, io.EOF
}
return b.Buffer.Write(p)
}
func NewLimitedBuffer(cap int) *LimitedBuffer {
return &LimitedBuffer{Buffer: &bytes.Buffer{}, cap: cap}
}