From 12b02143b2f21fbd9c3ad88cb1edd521978c12b5 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 12 Sep 2019 10:36:45 +0200 Subject: [PATCH] runservice: don't save executor task data in etcd Reorganize ExecutorTask to better distinguish between the task Spec and the Status. Split the task Spec in a sub part called ExecutorTaskSpecData that contains tasks data that don't have to be saved in etcd because it contains data that can be very big and can be generated starting from the run and the runconfig. --- internal/services/executor/executor.go | 74 ++++----- internal/services/runservice/action/action.go | 55 ++++++ internal/services/runservice/api/api.go | 4 +- internal/services/runservice/api/executor.go | 35 ++-- internal/services/runservice/common/common.go | 157 ++++++++++++++++++ internal/services/runservice/runservice.go | 4 +- internal/services/runservice/scheduler.go | 140 +++++----------- internal/services/runservice/store/store.go | 54 +----- services/runservice/types/types.go | 56 +++++-- 9 files changed, 349 insertions(+), 230 deletions(-) diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index f61ca4b..40eb237 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -67,9 +67,9 @@ func (e *Executor) getAllPods(ctx context.Context, all bool) ([]driver.Pod, erro func stepUser(t *types.ExecutorTask) string { // use the container specified user and override with task user if defined - user := t.Containers[0].User - if t.User != "" { - user = t.User + user := t.Spec.Containers[0].User + if t.Spec.User != "" { + user = t.Spec.User } return user @@ -122,8 +122,8 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe // TODO(sgotti) this line is used only for old runconfig versions that don't // set a task default shell in the runconfig shell := defaultShell - if t.Shell != "" { - shell = t.Shell + if t.Spec.Shell != "" { + shell = t.Spec.Shell } if s.Shell != "" { shell = s.Shell @@ -143,14 +143,14 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe } // override task working dir with runstep working dir if provided - workingDir := t.WorkingDir + workingDir := t.Spec.WorkingDir if s.WorkingDir != "" { workingDir = s.WorkingDir } // generate the environment using the task environment and then overriding with the runstep environment environment := map[string]string{} - for envName, envValue := range t.Environment { + for envName, envValue := range t.Spec.Environment { environment[envName] = envValue } for envName, envValue := range s.Environment { @@ -208,15 +208,15 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor } defer archivef.Close() - workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) + workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir) if err != nil { - _, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err)) return -1, err } execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, WorkingDir: workingDir, User: stepUser(t), AttachStdin: true, @@ -278,7 +278,7 @@ func (e *Executor) expandDir(ctx context.Context, t *types.ExecutorTask, pod dri execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, User: stepUser(t), AttachStdin: true, Stdout: stdout, @@ -307,7 +307,7 @@ func (e *Executor) mkdir(ctx context.Context, t *types.ExecutorTask, pod driver. execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, User: stepUser(t), AttachStdin: true, Stdout: logf, @@ -336,15 +336,15 @@ func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driv // limit the template answer to max 1MiB stdout := util.NewLimitedBuffer(1024 * 1024) - workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) + workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir) if err != nil { - _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err)) return "", err } execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, WorkingDir: workingDir, User: stepUser(t), AttachStdin: true, @@ -384,15 +384,15 @@ func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source } cmd := append([]string{toolboxContainerPath, "unarchive"}, args...) - workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) + workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir) if err != nil { - _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err)) return err } execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, WorkingDir: workingDir, User: stepUser(t), AttachStdin: true, @@ -432,7 +432,7 @@ func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreW } defer logf.Close() - for _, op := range t.WorkspaceOperations { + for _, op := range t.Spec.WorkspaceOperations { log.Debugf("unarchiving workspace for taskID: %s, step: %d", level, op.TaskID, op.Step) resp, err := e.runserviceClient.GetArchive(ctx, op.TaskID, op.Step) if err != nil { @@ -473,7 +473,7 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, fmt.Fprintf(logf, "cache key %q\n", userKey) // append cache prefix - key := t.CachePrefix + "-" + userKey + key := t.Spec.CachePrefix + "-" + userKey // check that the cache key doesn't already exists resp, err := e.runserviceClient.CheckCache(ctx, key, false) @@ -503,15 +503,15 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, } defer archivef.Close() - workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) + workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir) if err != nil { - _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err)) return -1, err } execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, WorkingDir: workingDir, User: stepUser(t), AttachStdin: true, @@ -605,7 +605,7 @@ func (e *Executor) doRestoreCacheStep(ctx context.Context, s *types.RestoreCache fmt.Fprintf(logf, "cache key %q\n", userKey) // append cache prefix - key := t.CachePrefix + "-" + userKey + key := t.Spec.CachePrefix + "-" + userKey resp, err := e.runserviceClient.GetCache(ctx, key, true) if err != nil { @@ -855,7 +855,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { // error out if privileged containers are required but not allowed requiresPrivilegedContainers := false - for _, c := range et.Containers { + for _, c := range et.Spec.Containers { if c.Privileged { requiresPrivilegedContainers = true break @@ -868,7 +868,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { log.Debugf("starting pod") - dockerConfig, err := registry.GenDockerConfig(et.DockerRegistriesAuth, []string{et.Containers[0].Image}) + dockerConfig, err := registry.GenDockerConfig(et.Spec.DockerRegistriesAuth, []string{et.Spec.Containers[0].Image}) if err != nil { return err } @@ -878,12 +878,12 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { // tasks failed to start and don't clash with existing pods) ID: uuid.NewV4().String(), TaskID: et.ID, - Arch: et.Arch, + Arch: et.Spec.Arch, InitVolumeDir: toolboxContainerDir, DockerConfig: dockerConfig, - Containers: make([]*driver.ContainerConfig, len(et.Containers)), + Containers: make([]*driver.ContainerConfig, len(et.Spec.Containers)), } - for i, c := range et.Containers { + for i, c := range et.Spec.Containers { var cmd []string if i == 0 { cmd = []string{toolboxContainerPath, "sleeper"} @@ -909,10 +909,10 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { } _, _ = outf.WriteString("Pod started.\n") - if et.WorkingDir != "" { - _, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.WorkingDir)) - if err := e.mkdir(ctx, et, pod, outf, et.WorkingDir); err != nil { - _, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.WorkingDir, err)) + if et.Spec.WorkingDir != "" { + _, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.Spec.WorkingDir)) + if err := e.mkdir(ctx, et, pod, outf, et.Spec.WorkingDir); err != nil { + _, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.Spec.WorkingDir, err)) return err } } @@ -922,7 +922,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { } func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod driver.Pod) (int, error) { - for i, step := range rt.et.Steps { + for i, step := range rt.et.Spec.Steps { rt.Lock() rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseRunning rt.et.Status.Steps[i].StartTime = util.TimeP(time.Now()) @@ -975,7 +975,7 @@ func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod dr rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseSuccess if err != nil { - if rt.et.Stop { + if rt.et.Spec.Stop { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped } else { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed @@ -1154,11 +1154,11 @@ func (e *Executor) tasksUpdater(ctx context.Context) error { func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { log.Debugf("et: %v", util.Dump(et)) - if et.Status.ExecutorID != e.id { + if et.Spec.ExecutorID != e.id { return } - if et.Stop { + if et.Spec.Stop { e.stopTask(ctx, et) } diff --git a/internal/services/runservice/action/action.go b/internal/services/runservice/action/action.go index cd4da6c..9da61d3 100644 --- a/internal/services/runservice/action/action.go +++ b/internal/services/runservice/action/action.go @@ -585,3 +585,58 @@ func (h *ActionHandler) getRunCounter(ctx context.Context, group string) (uint64 return c, cgt, nil } + +func (h *ActionHandler) GetExecutorTask(ctx context.Context, etID string) (*types.ExecutorTask, error) { + et, err := store.GetExecutorTask(ctx, h.e, etID) + if err != nil && err != etcd.ErrKeyNotFound { + return nil, err + } + if et == nil { + return nil, util.NewErrNotFound(errors.Errorf("executor task %q not found", etID)) + } + + r, _, err := store.GetRun(ctx, h.e, et.Spec.RunID) + if err != nil { + return nil, errors.Errorf("cannot get run %q: %w", et.Spec.RunID, err) + } + rc, err := store.OSTGetRunConfig(h.dm, r.ID) + if err != nil { + return nil, errors.Errorf("cannot get run config %q: %w", r.ID, err) + } + rt, ok := r.Tasks[et.ID] + if !ok { + return nil, errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID) + } + + // generate ExecutorTaskSpecData + et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc) + + return et, nil +} + +func (h *ActionHandler) GetExecutorTasks(ctx context.Context, executorID string) ([]*types.ExecutorTask, error) { + ets, err := store.GetExecutorTasks(ctx, h.e, executorID) + if err != nil && err != etcd.ErrKeyNotFound { + return nil, err + } + + for _, et := range ets { + r, _, err := store.GetRun(ctx, h.e, et.Spec.RunID) + if err != nil { + return nil, errors.Errorf("cannot get run %q: %w", et.Spec.RunID, err) + } + rc, err := store.OSTGetRunConfig(h.dm, r.ID) + if err != nil { + return nil, errors.Errorf("cannot get run config %q: %w", r.ID, err) + } + rt, ok := r.Tasks[et.ID] + if !ok { + return nil, errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID) + } + + // generate ExecutorTaskSpecData + et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc) + } + + return ets, nil +} diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 5accebe..27bb246 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -244,12 +244,12 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se if err != nil { return err, true } - executor, err := store.GetExecutor(ctx, h.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, h.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err, true } if executor == nil { - return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Status.ExecutorID)), true + return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Spec.ExecutorID)), true } var url string diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index 5993d38..c1c7960 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -29,7 +29,9 @@ import ( "agola.io/agola/internal/services/runservice/action" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/store" + "agola.io/agola/internal/util" "agola.io/agola/services/runservice/types" + errors "golang.org/x/xerrors" "github.com/gorilla/mux" "go.uber.org/zap" @@ -137,11 +139,12 @@ func (h *ExecutorTaskStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Req } type ExecutorTaskHandler struct { - e *etcd.Store + log *zap.SugaredLogger + ah *action.ActionHandler } -func NewExecutorTaskHandler(e *etcd.Store) *ExecutorTaskHandler { - return &ExecutorTaskHandler{e: e} +func NewExecutorTaskHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorTaskHandler { + return &ExecutorTaskHandler{log: logger.Sugar(), ah: ah} } func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -151,32 +154,28 @@ func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) // TODO(sgotti) Check authorized call from executors etID := vars["taskid"] if etID == "" { - http.Error(w, "", http.StatusBadRequest) + httpError(w, util.NewErrBadRequest(errors.Errorf("taskid is empty"))) return } - et, err := store.GetExecutorTask(ctx, h.e, etID) - if err != nil && err != etcd.ErrKeyNotFound { - http.Error(w, "", http.StatusInternalServerError) - return - } - if et == nil { - http.Error(w, "", http.StatusNotFound) + et, err := h.ah.GetExecutorTask(ctx, etID) + if httpError(w, err) { + h.log.Errorf("err: %+v", err) return } - if err := json.NewEncoder(w).Encode(et); err != nil { - http.Error(w, "", http.StatusInternalServerError) - return + if err := httpResponse(w, http.StatusOK, et); err != nil { + h.log.Errorf("err: %+v", err) } } type ExecutorTasksHandler struct { - e *etcd.Store + log *zap.SugaredLogger + ah *action.ActionHandler } -func NewExecutorTasksHandler(e *etcd.Store) *ExecutorTasksHandler { - return &ExecutorTasksHandler{e: e} +func NewExecutorTasksHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorTasksHandler { + return &ExecutorTasksHandler{log: logger.Sugar(), ah: ah} } func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -190,7 +189,7 @@ func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) return } - ets, err := store.GetExecutorTasks(ctx, h.e, executorID) + ets, err := h.ah.GetExecutorTasks(ctx, executorID) if err != nil { http.Error(w, "", http.StatusInternalServerError) return diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index a94cf15..85a49f6 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -15,7 +15,13 @@ package common import ( + "fmt" "path" + "sort" + + "agola.io/agola/internal/runconfig" + "agola.io/agola/internal/util" + "agola.io/agola/services/runservice/types" ) const ( @@ -81,3 +87,154 @@ const ( DataTypeRunConfig DataType = "runconfig" DataTypeRunCounter DataType = "runcounter" ) + +func OSTSubGroupsAndGroupTypes(group string) []string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + return h +} + +func OSTRootGroup(group string) string { + pl := util.PathList(group) + if len(pl) < 2 { + panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group)) + } + + return pl[1] +} + +func OSTSubGroups(group string) []string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + // remove group types + sg := []string{} + for i, g := range h { + if i%2 == 0 { + sg = append(sg, g) + } + } + + return sg +} + +func OSTSubGroupTypes(group string) []string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + // remove group names + sg := []string{} + for i, g := range h { + if i%2 == 1 { + sg = append(sg, g) + } + } + + return sg +} + +type parentsByLevelName []*types.RunConfigTask + +func (p parentsByLevelName) Len() int { return len(p) } +func (p parentsByLevelName) Less(i, j int) bool { + if p[i].Level != p[j].Level { + return p[i].Level < p[j].Level + } + return p[i].Name < p[j].Name +} +func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func mergeEnv(dest, src map[string]string) { + for k, v := range src { + dest[k] = v + } +} + +func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfig) *types.ExecutorTaskSpecData { + rct := rc.Tasks[rt.ID] + + environment := map[string]string{} + if rct.Environment != nil { + environment = rct.Environment + } + mergeEnv(environment, rc.StaticEnvironment) + // run config Environment variables ovverride every other environment variable + mergeEnv(environment, rc.Environment) + + cachePrefix := OSTRootGroup(r.Group) + if rc.CacheGroup != "" { + cachePrefix = rc.CacheGroup + } + + data := &types.ExecutorTaskSpecData{ + // The executorTask ID must be the same as the runTask ID so we can detect if + // there's already an executorTask scheduled for that run task and we can get + // at most once task execution + TaskName: rct.Name, + Arch: rct.Runtime.Arch, + Containers: rct.Runtime.Containers, + Environment: environment, + WorkingDir: rct.WorkingDir, + Shell: rct.Shell, + User: rct.User, + Steps: rct.Steps, + CachePrefix: cachePrefix, + DockerRegistriesAuth: rct.DockerRegistriesAuth, + } + + // calculate workspace operations + // TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer. + // this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility + wsops := []types.WorkspaceOperation{} + rctAllParents := runconfig.GetAllParents(rc.Tasks, rct) + + // sort parents by level and name just for reproducibility + sort.Sort(parentsByLevelName(rctAllParents)) + + for _, rctParent := range rctAllParents { + for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives { + wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep} + wsops = append(wsops, wsop) + } + } + + data.WorkspaceOperations = wsops + + return data +} + +func GenExecutorTask(r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { + rct := rc.Tasks[rt.ID] + + et := &types.ExecutorTask{ + // The executorTask ID must be the same as the runTask ID so we can detect if + // there's already an executorTask scheduled for that run task and we can get + // at most once task execution + ID: rt.ID, + Spec: types.ExecutorTaskSpec{ + ExecutorID: executor.ID, + RunID: r.ID, + // ExecutorTaskSpecData is not saved in etcd to avoid exceeding the max etcd value + // size but is generated everytime the executor task is sent to the executor + }, + Status: types.ExecutorTaskStatus{ + Phase: types.ExecutorTaskPhaseNotStarted, + Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)), + }, + } + + for i := range et.Status.Steps { + et.Status.Steps[i] = &types.ExecutorTaskStepStatus{ + Phase: types.ExecutorTaskPhaseNotStarted, + } + } + + return et +} diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index 231e498..1f229a5 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -211,8 +211,8 @@ func (s *Runservice) setupDefaultRouter(etCh chan *types.ExecutorTask) http.Hand // executor dedicated api, only calls from executor should happen on these handlers executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, etCh) - executorTaskHandler := api.NewExecutorTaskHandler(s.e) - executorTasksHandler := api.NewExecutorTasksHandler(s.e) + executorTaskHandler := api.NewExecutorTaskHandler(logger, s.ah) + executorTasksHandler := api.NewExecutorTasksHandler(logger, s.ah) archivesHandler := api.NewArchivesHandler(logger, s.ost) cacheHandler := api.NewCacheHandler(logger, s.ost) cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 6e0e1ea..76ea855 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "net/http" - "sort" "strconv" "time" @@ -52,12 +51,6 @@ var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) var logger = slog.New(level) var log = logger.Sugar() -func mergeEnv(dest, src map[string]string) { - for k, v := range src { - dest[k] = v - } -} - func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { // the real source of active tasks is the number of executor tasks in etcd // we can't rely on RunTask.Status since it's only updated when receiveing @@ -255,7 +248,7 @@ func (s *Runservice) submitRunTasks(ctx context.Context, r *types.Run, rc *types return nil } - et := s.genExecutorTask(ctx, r, rt, rc, executor) + et := common.GenExecutorTask(r, rt, rc, executor) log.Debugf("et: %s", util.Dump(et)) // check that the executorTask wasn't already scheduled @@ -333,107 +326,48 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type return nil } -type parentsByLevelName []*types.RunConfigTask - -func (p parentsByLevelName) Len() int { return len(p) } -func (p parentsByLevelName) Less(i, j int) bool { - if p[i].Level != p[j].Level { - return p[i].Level < p[j].Level - } - return p[i].Name < p[j].Name -} -func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -func (s *Runservice) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { - rct := rc.Tasks[rt.ID] - - environment := map[string]string{} - if rct.Environment != nil { - environment = rct.Environment - } - mergeEnv(environment, rc.StaticEnvironment) - // run config Environment variables ovverride every other environment variable - mergeEnv(environment, rc.Environment) - - cachePrefix := store.OSTRootGroup(r.Group) - if rc.CacheGroup != "" { - cachePrefix = rc.CacheGroup - } - - et := &types.ExecutorTask{ - // The executorTask ID must be the same as the runTask ID so we can detect if - // there's already an executorTask scheduled for that run task and we can get - // at most once task execution - ID: rt.ID, - RunID: r.ID, - TaskName: rct.Name, - Arch: rct.Runtime.Arch, - Containers: rct.Runtime.Containers, - Environment: environment, - WorkingDir: rct.WorkingDir, - Shell: rct.Shell, - User: rct.User, - Steps: rct.Steps, - CachePrefix: cachePrefix, - Status: types.ExecutorTaskStatus{ - Phase: types.ExecutorTaskPhaseNotStarted, - Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)), - ExecutorID: executor.ID, - }, - DockerRegistriesAuth: rct.DockerRegistriesAuth, - } - - for i := range et.Status.Steps { - et.Status.Steps[i] = &types.ExecutorTaskStepStatus{ - Phase: types.ExecutorTaskPhaseNotStarted, - } - } - - // calculate workspace operations - // TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer. - // this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility - wsops := []types.WorkspaceOperation{} - rctAllParents := runconfig.GetAllParents(rc.Tasks, rct) - - // sort parents by level and name just for reproducibility - sort.Sort(parentsByLevelName(rctAllParents)) - - for _, rctParent := range rctAllParents { - log.Debugf("rctParent: %s", util.Dump(rctParent)) - for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives { - wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep} - wsops = append(wsops, wsop) - } - } - - et.WorkspaceOperations = wsops - - return et -} - // sendExecutorTask sends executor task to executor, if this fails the executor // will periodically fetch the executortask anyway func (s *Runservice) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { - executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err } if executor == nil { - log.Warnf("executor with id %q doesn't exist", et.Status.ExecutorID) + log.Warnf("executor with id %q doesn't exist", et.Spec.ExecutorID) return nil } + r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID) + if err != nil { + return err + } + rc, err := store.OSTGetRunConfig(s.dm, r.ID) + if err != nil { + return errors.Errorf("cannot get run config %q: %w", r.ID, err) + } + rt, ok := r.Tasks[et.ID] + if !ok { + return errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID) + } + + // take a copy to not change the input executorTask + et = et.DeepCopy() + + // generate ExecutorTaskSpecData + et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc) + etj, err := json.Marshal(et) if err != nil { return err } - r, err := http.Post(executor.ListenURL+"/api/v1alpha/executor", "", bytes.NewReader(etj)) + req, err := http.Post(executor.ListenURL+"/api/v1alpha/executor", "", bytes.NewReader(etj)) if err != nil { return err } - if r.StatusCode != http.StatusOK { - return errors.Errorf("received http status: %d", r.StatusCode) + if req.StatusCode != http.StatusOK { + return errors.Errorf("received http status: %d", req.StatusCode) } return nil @@ -549,7 +483,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru // if the run is set to stop, stop all tasks if r.Stop { for _, et := range activeExecutorTasks { - et.Stop = true + et.Spec.Stop = true if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { return err } @@ -664,7 +598,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx } func (s *Runservice) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { - r, _, err := store.GetRun(ctx, s.e, et.RunID) + r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID) if err != nil { return err } @@ -819,7 +753,7 @@ func (s *Runservice) executorTasksCleaner(ctx context.Context) error { func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error { log.Debugf("et: %s", util.Dump(et)) if et.Status.Phase.IsFinished() { - r, _, err := store.GetRun(ctx, s.e, et.RunID) + r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID) if err != nil { if err == etcd.ErrKeyNotFound { // run doesn't exists, remove executor task @@ -835,8 +769,8 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor if r.Phase.IsFinished() { // if the run is finished mark the executor tasks to stop - if !et.Stop { - et.Stop = true + if !et.Spec.Stop { + et.Spec.Stop = true if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { return err } @@ -850,13 +784,13 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor if !et.Status.Phase.IsFinished() { // if the executor doesn't exists anymore mark the not finished executor tasks as failed - executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err } if executor == nil { - log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Status.ExecutorID, et.ID) - et.FailError = "executor deleted" + log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Spec.ExecutorID, et.ID) + et.Status.FailError = "executor deleted" et.Status.Phase = types.ExecutorTaskPhaseFailed et.Status.EndTime = util.TimeP(time.Now()) for _, s := range et.Status.Steps { @@ -947,12 +881,12 @@ func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool } return nil } - executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err } if executor == nil { - log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Status.ExecutorID) + log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Spec.ExecutorID) return nil } @@ -1107,12 +1041,12 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu log.Errorf("executor task with id %q doesn't exist. This shouldn't happen. Skipping fetching", rt.ID) return nil } - executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err } if executor == nil { - log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Status.ExecutorID) + log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Spec.ExecutorID) return nil } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index a54bcf1..207c8d6 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -37,58 +37,6 @@ const ( MaxChangegroupNameLength = 256 ) -func OSTSubGroupsAndGroupTypes(group string) []string { - h := util.PathHierarchy(group) - if len(h)%2 != 1 { - panic(fmt.Errorf("wrong group path %q", group)) - } - - return h -} - -func OSTRootGroup(group string) string { - pl := util.PathList(group) - if len(pl) < 2 { - panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group)) - } - - return pl[1] -} - -func OSTSubGroups(group string) []string { - h := util.PathHierarchy(group) - if len(h)%2 != 1 { - panic(fmt.Errorf("wrong group path %q", group)) - } - - // remove group types - sg := []string{} - for i, g := range h { - if i%2 == 0 { - sg = append(sg, g) - } - } - - return sg -} - -func OSTSubGroupTypes(group string) []string { - h := util.PathHierarchy(group) - if len(h)%2 != 1 { - panic(fmt.Errorf("wrong group path %q", group)) - } - - // remove group names - sg := []string{} - for i, g := range h { - if i%2 == 1 { - sg = append(sg, g) - } - } - - return sg -} - func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*datamanager.Action, error) { // use the first group dir after the root pl := util.PathList(group) @@ -364,7 +312,7 @@ func GetExecutorTasks(ctx context.Context, e *etcd.Store, executorID string) ([] return nil, err } et.Revision = kv.ModRevision - if et.Status.ExecutorID == executorID { + if et.Spec.ExecutorID == executorID { ets = append(ets, et) } } diff --git a/services/runservice/types/types.go b/services/runservice/types/types.go index 2d8e57f..45987af 100644 --- a/services/runservice/types/types.go +++ b/services/runservice/types/types.go @@ -459,9 +459,38 @@ func (s ExecutorTaskPhase) IsFinished() bool { } type ExecutorTask struct { - Revision int64 `json:"revision,omitempty"` - ID string `json:"id,omitempty"` - RunID string `json:"run_id,omitempty"` + ID string `json:"id,omitempty"` + + Spec ExecutorTaskSpec `json:"spec,omitempty"` + + Status ExecutorTaskStatus `json:"status,omitempty"` + + // internal values not saved + Revision int64 `json:"-"` +} + +func (et *ExecutorTask) DeepCopy() *ExecutorTask { + net, err := copystructure.Copy(et) + if err != nil { + panic(err) + } + return net.(*ExecutorTask) +} + +type ExecutorTaskSpec struct { + ExecutorID string `json:"executor_id,omitempty"` + RunID string `json:"run_id,omitempty"` + + // Stop is used to signal from the scheduler when the task must be stopped + Stop bool `json:"stop,omitempty"` + + *ExecutorTaskSpecData +} + +// ExecutorTaskSpecData defines the task data required to execute the tasks. These +// values are not saved in etcd to avoid exceeding the max etcd value size but +// are generated everytime they are sent to the executor +type ExecutorTaskSpecData struct { TaskName string `json:"task_name,omitempty"` Arch types.Arch `json:"arch,omitempty"` Containers []*Container `json:"containers,omitempty"` @@ -471,27 +500,24 @@ type ExecutorTask struct { User string `json:"user,omitempty"` Privileged bool `json:"privileged"` - DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"` - - Steps Steps `json:"steps,omitempty"` - - Status ExecutorTaskStatus `json:"status,omitempty"` - SetupError string `fail_reason:"setup_error,omitempty"` - FailError string `fail_reason:"fail_error,omitempty"` - WorkspaceOperations []WorkspaceOperation `json:"workspace_operations,omitempty"` + DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"` + // Cache prefix to use when asking for a cache key. To isolate caches between // groups (projects) CachePrefix string `json:"cache_prefix,omitempty"` - // Stop is used to signal from the scheduler when the task must be stopped - Stop bool `json:"stop,omitempty"` + Steps Steps `json:"steps,omitempty"` } type ExecutorTaskStatus struct { - ExecutorID string `json:"executor_id,omitempty"` - Phase ExecutorTaskPhase `json:"phase,omitempty"` + ID string `json:"id,omitempty"` + Revision int64 `json:"revision,omitempty"` + + Phase ExecutorTaskPhase `json:"phase,omitempty"` + + FailError string `json:"fail_error,omitempty"` SetupStep ExecutorTaskStepStatus `json:"setup_step,omitempty"` Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"`