diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index f61ca4b..40eb237 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -67,9 +67,9 @@ func (e *Executor) getAllPods(ctx context.Context, all bool) ([]driver.Pod, erro func stepUser(t *types.ExecutorTask) string { // use the container specified user and override with task user if defined - user := t.Containers[0].User - if t.User != "" { - user = t.User + user := t.Spec.Containers[0].User + if t.Spec.User != "" { + user = t.Spec.User } return user @@ -122,8 +122,8 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe // TODO(sgotti) this line is used only for old runconfig versions that don't // set a task default shell in the runconfig shell := defaultShell - if t.Shell != "" { - shell = t.Shell + if t.Spec.Shell != "" { + shell = t.Spec.Shell } if s.Shell != "" { shell = s.Shell @@ -143,14 +143,14 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe } // override task working dir with runstep working dir if provided - workingDir := t.WorkingDir + workingDir := t.Spec.WorkingDir if s.WorkingDir != "" { workingDir = s.WorkingDir } // generate the environment using the task environment and then overriding with the runstep environment environment := map[string]string{} - for envName, envValue := range t.Environment { + for envName, envValue := range t.Spec.Environment { environment[envName] = envValue } for envName, envValue := range s.Environment { @@ -208,15 +208,15 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor } defer archivef.Close() - workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) + workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir) if err != nil { - _, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err)) return -1, err } execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, WorkingDir: workingDir, User: stepUser(t), AttachStdin: true, @@ -278,7 +278,7 @@ func (e *Executor) expandDir(ctx context.Context, t *types.ExecutorTask, pod dri execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, User: stepUser(t), AttachStdin: true, Stdout: stdout, @@ -307,7 +307,7 @@ func (e *Executor) mkdir(ctx context.Context, t *types.ExecutorTask, pod driver. execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, User: stepUser(t), AttachStdin: true, Stdout: logf, @@ -336,15 +336,15 @@ func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driv // limit the template answer to max 1MiB stdout := util.NewLimitedBuffer(1024 * 1024) - workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) + workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir) if err != nil { - _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err)) return "", err } execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, WorkingDir: workingDir, User: stepUser(t), AttachStdin: true, @@ -384,15 +384,15 @@ func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source } cmd := append([]string{toolboxContainerPath, "unarchive"}, args...) - workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) + workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir) if err != nil { - _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err)) return err } execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, WorkingDir: workingDir, User: stepUser(t), AttachStdin: true, @@ -432,7 +432,7 @@ func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreW } defer logf.Close() - for _, op := range t.WorkspaceOperations { + for _, op := range t.Spec.WorkspaceOperations { log.Debugf("unarchiving workspace for taskID: %s, step: %d", level, op.TaskID, op.Step) resp, err := e.runserviceClient.GetArchive(ctx, op.TaskID, op.Step) if err != nil { @@ -473,7 +473,7 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, fmt.Fprintf(logf, "cache key %q\n", userKey) // append cache prefix - key := t.CachePrefix + "-" + userKey + key := t.Spec.CachePrefix + "-" + userKey // check that the cache key doesn't already exists resp, err := e.runserviceClient.CheckCache(ctx, key, false) @@ -503,15 +503,15 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, } defer archivef.Close() - workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) + workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir) if err != nil { - _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err)) return -1, err } execConfig := &driver.ExecConfig{ Cmd: cmd, - Env: t.Environment, + Env: t.Spec.Environment, WorkingDir: workingDir, User: stepUser(t), AttachStdin: true, @@ -605,7 +605,7 @@ func (e *Executor) doRestoreCacheStep(ctx context.Context, s *types.RestoreCache fmt.Fprintf(logf, "cache key %q\n", userKey) // append cache prefix - key := t.CachePrefix + "-" + userKey + key := t.Spec.CachePrefix + "-" + userKey resp, err := e.runserviceClient.GetCache(ctx, key, true) if err != nil { @@ -855,7 +855,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { // error out if privileged containers are required but not allowed requiresPrivilegedContainers := false - for _, c := range et.Containers { + for _, c := range et.Spec.Containers { if c.Privileged { requiresPrivilegedContainers = true break @@ -868,7 +868,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { log.Debugf("starting pod") - dockerConfig, err := registry.GenDockerConfig(et.DockerRegistriesAuth, []string{et.Containers[0].Image}) + dockerConfig, err := registry.GenDockerConfig(et.Spec.DockerRegistriesAuth, []string{et.Spec.Containers[0].Image}) if err != nil { return err } @@ -878,12 +878,12 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { // tasks failed to start and don't clash with existing pods) ID: uuid.NewV4().String(), TaskID: et.ID, - Arch: et.Arch, + Arch: et.Spec.Arch, InitVolumeDir: toolboxContainerDir, DockerConfig: dockerConfig, - Containers: make([]*driver.ContainerConfig, len(et.Containers)), + Containers: make([]*driver.ContainerConfig, len(et.Spec.Containers)), } - for i, c := range et.Containers { + for i, c := range et.Spec.Containers { var cmd []string if i == 0 { cmd = []string{toolboxContainerPath, "sleeper"} @@ -909,10 +909,10 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { } _, _ = outf.WriteString("Pod started.\n") - if et.WorkingDir != "" { - _, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.WorkingDir)) - if err := e.mkdir(ctx, et, pod, outf, et.WorkingDir); err != nil { - _, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.WorkingDir, err)) + if et.Spec.WorkingDir != "" { + _, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.Spec.WorkingDir)) + if err := e.mkdir(ctx, et, pod, outf, et.Spec.WorkingDir); err != nil { + _, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.Spec.WorkingDir, err)) return err } } @@ -922,7 +922,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { } func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod driver.Pod) (int, error) { - for i, step := range rt.et.Steps { + for i, step := range rt.et.Spec.Steps { rt.Lock() rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseRunning rt.et.Status.Steps[i].StartTime = util.TimeP(time.Now()) @@ -975,7 +975,7 @@ func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod dr rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseSuccess if err != nil { - if rt.et.Stop { + if rt.et.Spec.Stop { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped } else { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed @@ -1154,11 +1154,11 @@ func (e *Executor) tasksUpdater(ctx context.Context) error { func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { log.Debugf("et: %v", util.Dump(et)) - if et.Status.ExecutorID != e.id { + if et.Spec.ExecutorID != e.id { return } - if et.Stop { + if et.Spec.Stop { e.stopTask(ctx, et) } diff --git a/internal/services/runservice/action/action.go b/internal/services/runservice/action/action.go index cd4da6c..9da61d3 100644 --- a/internal/services/runservice/action/action.go +++ b/internal/services/runservice/action/action.go @@ -585,3 +585,58 @@ func (h *ActionHandler) getRunCounter(ctx context.Context, group string) (uint64 return c, cgt, nil } + +func (h *ActionHandler) GetExecutorTask(ctx context.Context, etID string) (*types.ExecutorTask, error) { + et, err := store.GetExecutorTask(ctx, h.e, etID) + if err != nil && err != etcd.ErrKeyNotFound { + return nil, err + } + if et == nil { + return nil, util.NewErrNotFound(errors.Errorf("executor task %q not found", etID)) + } + + r, _, err := store.GetRun(ctx, h.e, et.Spec.RunID) + if err != nil { + return nil, errors.Errorf("cannot get run %q: %w", et.Spec.RunID, err) + } + rc, err := store.OSTGetRunConfig(h.dm, r.ID) + if err != nil { + return nil, errors.Errorf("cannot get run config %q: %w", r.ID, err) + } + rt, ok := r.Tasks[et.ID] + if !ok { + return nil, errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID) + } + + // generate ExecutorTaskSpecData + et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc) + + return et, nil +} + +func (h *ActionHandler) GetExecutorTasks(ctx context.Context, executorID string) ([]*types.ExecutorTask, error) { + ets, err := store.GetExecutorTasks(ctx, h.e, executorID) + if err != nil && err != etcd.ErrKeyNotFound { + return nil, err + } + + for _, et := range ets { + r, _, err := store.GetRun(ctx, h.e, et.Spec.RunID) + if err != nil { + return nil, errors.Errorf("cannot get run %q: %w", et.Spec.RunID, err) + } + rc, err := store.OSTGetRunConfig(h.dm, r.ID) + if err != nil { + return nil, errors.Errorf("cannot get run config %q: %w", r.ID, err) + } + rt, ok := r.Tasks[et.ID] + if !ok { + return nil, errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID) + } + + // generate ExecutorTaskSpecData + et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc) + } + + return ets, nil +} diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 5accebe..27bb246 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -244,12 +244,12 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se if err != nil { return err, true } - executor, err := store.GetExecutor(ctx, h.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, h.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err, true } if executor == nil { - return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Status.ExecutorID)), true + return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Spec.ExecutorID)), true } var url string diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index 5993d38..c1c7960 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -29,7 +29,9 @@ import ( "agola.io/agola/internal/services/runservice/action" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/store" + "agola.io/agola/internal/util" "agola.io/agola/services/runservice/types" + errors "golang.org/x/xerrors" "github.com/gorilla/mux" "go.uber.org/zap" @@ -137,11 +139,12 @@ func (h *ExecutorTaskStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Req } type ExecutorTaskHandler struct { - e *etcd.Store + log *zap.SugaredLogger + ah *action.ActionHandler } -func NewExecutorTaskHandler(e *etcd.Store) *ExecutorTaskHandler { - return &ExecutorTaskHandler{e: e} +func NewExecutorTaskHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorTaskHandler { + return &ExecutorTaskHandler{log: logger.Sugar(), ah: ah} } func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -151,32 +154,28 @@ func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) // TODO(sgotti) Check authorized call from executors etID := vars["taskid"] if etID == "" { - http.Error(w, "", http.StatusBadRequest) + httpError(w, util.NewErrBadRequest(errors.Errorf("taskid is empty"))) return } - et, err := store.GetExecutorTask(ctx, h.e, etID) - if err != nil && err != etcd.ErrKeyNotFound { - http.Error(w, "", http.StatusInternalServerError) - return - } - if et == nil { - http.Error(w, "", http.StatusNotFound) + et, err := h.ah.GetExecutorTask(ctx, etID) + if httpError(w, err) { + h.log.Errorf("err: %+v", err) return } - if err := json.NewEncoder(w).Encode(et); err != nil { - http.Error(w, "", http.StatusInternalServerError) - return + if err := httpResponse(w, http.StatusOK, et); err != nil { + h.log.Errorf("err: %+v", err) } } type ExecutorTasksHandler struct { - e *etcd.Store + log *zap.SugaredLogger + ah *action.ActionHandler } -func NewExecutorTasksHandler(e *etcd.Store) *ExecutorTasksHandler { - return &ExecutorTasksHandler{e: e} +func NewExecutorTasksHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorTasksHandler { + return &ExecutorTasksHandler{log: logger.Sugar(), ah: ah} } func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -190,7 +189,7 @@ func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) return } - ets, err := store.GetExecutorTasks(ctx, h.e, executorID) + ets, err := h.ah.GetExecutorTasks(ctx, executorID) if err != nil { http.Error(w, "", http.StatusInternalServerError) return diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index a94cf15..85a49f6 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -15,7 +15,13 @@ package common import ( + "fmt" "path" + "sort" + + "agola.io/agola/internal/runconfig" + "agola.io/agola/internal/util" + "agola.io/agola/services/runservice/types" ) const ( @@ -81,3 +87,154 @@ const ( DataTypeRunConfig DataType = "runconfig" DataTypeRunCounter DataType = "runcounter" ) + +func OSTSubGroupsAndGroupTypes(group string) []string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + return h +} + +func OSTRootGroup(group string) string { + pl := util.PathList(group) + if len(pl) < 2 { + panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group)) + } + + return pl[1] +} + +func OSTSubGroups(group string) []string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + // remove group types + sg := []string{} + for i, g := range h { + if i%2 == 0 { + sg = append(sg, g) + } + } + + return sg +} + +func OSTSubGroupTypes(group string) []string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + // remove group names + sg := []string{} + for i, g := range h { + if i%2 == 1 { + sg = append(sg, g) + } + } + + return sg +} + +type parentsByLevelName []*types.RunConfigTask + +func (p parentsByLevelName) Len() int { return len(p) } +func (p parentsByLevelName) Less(i, j int) bool { + if p[i].Level != p[j].Level { + return p[i].Level < p[j].Level + } + return p[i].Name < p[j].Name +} +func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func mergeEnv(dest, src map[string]string) { + for k, v := range src { + dest[k] = v + } +} + +func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfig) *types.ExecutorTaskSpecData { + rct := rc.Tasks[rt.ID] + + environment := map[string]string{} + if rct.Environment != nil { + environment = rct.Environment + } + mergeEnv(environment, rc.StaticEnvironment) + // run config Environment variables ovverride every other environment variable + mergeEnv(environment, rc.Environment) + + cachePrefix := OSTRootGroup(r.Group) + if rc.CacheGroup != "" { + cachePrefix = rc.CacheGroup + } + + data := &types.ExecutorTaskSpecData{ + // The executorTask ID must be the same as the runTask ID so we can detect if + // there's already an executorTask scheduled for that run task and we can get + // at most once task execution + TaskName: rct.Name, + Arch: rct.Runtime.Arch, + Containers: rct.Runtime.Containers, + Environment: environment, + WorkingDir: rct.WorkingDir, + Shell: rct.Shell, + User: rct.User, + Steps: rct.Steps, + CachePrefix: cachePrefix, + DockerRegistriesAuth: rct.DockerRegistriesAuth, + } + + // calculate workspace operations + // TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer. + // this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility + wsops := []types.WorkspaceOperation{} + rctAllParents := runconfig.GetAllParents(rc.Tasks, rct) + + // sort parents by level and name just for reproducibility + sort.Sort(parentsByLevelName(rctAllParents)) + + for _, rctParent := range rctAllParents { + for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives { + wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep} + wsops = append(wsops, wsop) + } + } + + data.WorkspaceOperations = wsops + + return data +} + +func GenExecutorTask(r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { + rct := rc.Tasks[rt.ID] + + et := &types.ExecutorTask{ + // The executorTask ID must be the same as the runTask ID so we can detect if + // there's already an executorTask scheduled for that run task and we can get + // at most once task execution + ID: rt.ID, + Spec: types.ExecutorTaskSpec{ + ExecutorID: executor.ID, + RunID: r.ID, + // ExecutorTaskSpecData is not saved in etcd to avoid exceeding the max etcd value + // size but is generated everytime the executor task is sent to the executor + }, + Status: types.ExecutorTaskStatus{ + Phase: types.ExecutorTaskPhaseNotStarted, + Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)), + }, + } + + for i := range et.Status.Steps { + et.Status.Steps[i] = &types.ExecutorTaskStepStatus{ + Phase: types.ExecutorTaskPhaseNotStarted, + } + } + + return et +} diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index 231e498..1f229a5 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -211,8 +211,8 @@ func (s *Runservice) setupDefaultRouter(etCh chan *types.ExecutorTask) http.Hand // executor dedicated api, only calls from executor should happen on these handlers executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, etCh) - executorTaskHandler := api.NewExecutorTaskHandler(s.e) - executorTasksHandler := api.NewExecutorTasksHandler(s.e) + executorTaskHandler := api.NewExecutorTaskHandler(logger, s.ah) + executorTasksHandler := api.NewExecutorTasksHandler(logger, s.ah) archivesHandler := api.NewArchivesHandler(logger, s.ost) cacheHandler := api.NewCacheHandler(logger, s.ost) cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 6e0e1ea..76ea855 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "net/http" - "sort" "strconv" "time" @@ -52,12 +51,6 @@ var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) var logger = slog.New(level) var log = logger.Sugar() -func mergeEnv(dest, src map[string]string) { - for k, v := range src { - dest[k] = v - } -} - func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { // the real source of active tasks is the number of executor tasks in etcd // we can't rely on RunTask.Status since it's only updated when receiveing @@ -255,7 +248,7 @@ func (s *Runservice) submitRunTasks(ctx context.Context, r *types.Run, rc *types return nil } - et := s.genExecutorTask(ctx, r, rt, rc, executor) + et := common.GenExecutorTask(r, rt, rc, executor) log.Debugf("et: %s", util.Dump(et)) // check that the executorTask wasn't already scheduled @@ -333,107 +326,48 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type return nil } -type parentsByLevelName []*types.RunConfigTask - -func (p parentsByLevelName) Len() int { return len(p) } -func (p parentsByLevelName) Less(i, j int) bool { - if p[i].Level != p[j].Level { - return p[i].Level < p[j].Level - } - return p[i].Name < p[j].Name -} -func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -func (s *Runservice) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { - rct := rc.Tasks[rt.ID] - - environment := map[string]string{} - if rct.Environment != nil { - environment = rct.Environment - } - mergeEnv(environment, rc.StaticEnvironment) - // run config Environment variables ovverride every other environment variable - mergeEnv(environment, rc.Environment) - - cachePrefix := store.OSTRootGroup(r.Group) - if rc.CacheGroup != "" { - cachePrefix = rc.CacheGroup - } - - et := &types.ExecutorTask{ - // The executorTask ID must be the same as the runTask ID so we can detect if - // there's already an executorTask scheduled for that run task and we can get - // at most once task execution - ID: rt.ID, - RunID: r.ID, - TaskName: rct.Name, - Arch: rct.Runtime.Arch, - Containers: rct.Runtime.Containers, - Environment: environment, - WorkingDir: rct.WorkingDir, - Shell: rct.Shell, - User: rct.User, - Steps: rct.Steps, - CachePrefix: cachePrefix, - Status: types.ExecutorTaskStatus{ - Phase: types.ExecutorTaskPhaseNotStarted, - Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)), - ExecutorID: executor.ID, - }, - DockerRegistriesAuth: rct.DockerRegistriesAuth, - } - - for i := range et.Status.Steps { - et.Status.Steps[i] = &types.ExecutorTaskStepStatus{ - Phase: types.ExecutorTaskPhaseNotStarted, - } - } - - // calculate workspace operations - // TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer. - // this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility - wsops := []types.WorkspaceOperation{} - rctAllParents := runconfig.GetAllParents(rc.Tasks, rct) - - // sort parents by level and name just for reproducibility - sort.Sort(parentsByLevelName(rctAllParents)) - - for _, rctParent := range rctAllParents { - log.Debugf("rctParent: %s", util.Dump(rctParent)) - for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives { - wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep} - wsops = append(wsops, wsop) - } - } - - et.WorkspaceOperations = wsops - - return et -} - // sendExecutorTask sends executor task to executor, if this fails the executor // will periodically fetch the executortask anyway func (s *Runservice) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { - executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err } if executor == nil { - log.Warnf("executor with id %q doesn't exist", et.Status.ExecutorID) + log.Warnf("executor with id %q doesn't exist", et.Spec.ExecutorID) return nil } + r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID) + if err != nil { + return err + } + rc, err := store.OSTGetRunConfig(s.dm, r.ID) + if err != nil { + return errors.Errorf("cannot get run config %q: %w", r.ID, err) + } + rt, ok := r.Tasks[et.ID] + if !ok { + return errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID) + } + + // take a copy to not change the input executorTask + et = et.DeepCopy() + + // generate ExecutorTaskSpecData + et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc) + etj, err := json.Marshal(et) if err != nil { return err } - r, err := http.Post(executor.ListenURL+"/api/v1alpha/executor", "", bytes.NewReader(etj)) + req, err := http.Post(executor.ListenURL+"/api/v1alpha/executor", "", bytes.NewReader(etj)) if err != nil { return err } - if r.StatusCode != http.StatusOK { - return errors.Errorf("received http status: %d", r.StatusCode) + if req.StatusCode != http.StatusOK { + return errors.Errorf("received http status: %d", req.StatusCode) } return nil @@ -549,7 +483,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru // if the run is set to stop, stop all tasks if r.Stop { for _, et := range activeExecutorTasks { - et.Stop = true + et.Spec.Stop = true if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { return err } @@ -664,7 +598,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx } func (s *Runservice) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { - r, _, err := store.GetRun(ctx, s.e, et.RunID) + r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID) if err != nil { return err } @@ -819,7 +753,7 @@ func (s *Runservice) executorTasksCleaner(ctx context.Context) error { func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error { log.Debugf("et: %s", util.Dump(et)) if et.Status.Phase.IsFinished() { - r, _, err := store.GetRun(ctx, s.e, et.RunID) + r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID) if err != nil { if err == etcd.ErrKeyNotFound { // run doesn't exists, remove executor task @@ -835,8 +769,8 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor if r.Phase.IsFinished() { // if the run is finished mark the executor tasks to stop - if !et.Stop { - et.Stop = true + if !et.Spec.Stop { + et.Spec.Stop = true if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { return err } @@ -850,13 +784,13 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor if !et.Status.Phase.IsFinished() { // if the executor doesn't exists anymore mark the not finished executor tasks as failed - executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err } if executor == nil { - log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Status.ExecutorID, et.ID) - et.FailError = "executor deleted" + log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Spec.ExecutorID, et.ID) + et.Status.FailError = "executor deleted" et.Status.Phase = types.ExecutorTaskPhaseFailed et.Status.EndTime = util.TimeP(time.Now()) for _, s := range et.Status.Steps { @@ -947,12 +881,12 @@ func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool } return nil } - executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err } if executor == nil { - log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Status.ExecutorID) + log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Spec.ExecutorID) return nil } @@ -1107,12 +1041,12 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu log.Errorf("executor task with id %q doesn't exist. This shouldn't happen. Skipping fetching", rt.ID) return nil } - executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) + executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err } if executor == nil { - log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Status.ExecutorID) + log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Spec.ExecutorID) return nil } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index a54bcf1..207c8d6 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -37,58 +37,6 @@ const ( MaxChangegroupNameLength = 256 ) -func OSTSubGroupsAndGroupTypes(group string) []string { - h := util.PathHierarchy(group) - if len(h)%2 != 1 { - panic(fmt.Errorf("wrong group path %q", group)) - } - - return h -} - -func OSTRootGroup(group string) string { - pl := util.PathList(group) - if len(pl) < 2 { - panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group)) - } - - return pl[1] -} - -func OSTSubGroups(group string) []string { - h := util.PathHierarchy(group) - if len(h)%2 != 1 { - panic(fmt.Errorf("wrong group path %q", group)) - } - - // remove group types - sg := []string{} - for i, g := range h { - if i%2 == 0 { - sg = append(sg, g) - } - } - - return sg -} - -func OSTSubGroupTypes(group string) []string { - h := util.PathHierarchy(group) - if len(h)%2 != 1 { - panic(fmt.Errorf("wrong group path %q", group)) - } - - // remove group names - sg := []string{} - for i, g := range h { - if i%2 == 1 { - sg = append(sg, g) - } - } - - return sg -} - func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*datamanager.Action, error) { // use the first group dir after the root pl := util.PathList(group) @@ -364,7 +312,7 @@ func GetExecutorTasks(ctx context.Context, e *etcd.Store, executorID string) ([] return nil, err } et.Revision = kv.ModRevision - if et.Status.ExecutorID == executorID { + if et.Spec.ExecutorID == executorID { ets = append(ets, et) } } diff --git a/services/runservice/types/types.go b/services/runservice/types/types.go index 2d8e57f..45987af 100644 --- a/services/runservice/types/types.go +++ b/services/runservice/types/types.go @@ -459,9 +459,38 @@ func (s ExecutorTaskPhase) IsFinished() bool { } type ExecutorTask struct { - Revision int64 `json:"revision,omitempty"` - ID string `json:"id,omitempty"` - RunID string `json:"run_id,omitempty"` + ID string `json:"id,omitempty"` + + Spec ExecutorTaskSpec `json:"spec,omitempty"` + + Status ExecutorTaskStatus `json:"status,omitempty"` + + // internal values not saved + Revision int64 `json:"-"` +} + +func (et *ExecutorTask) DeepCopy() *ExecutorTask { + net, err := copystructure.Copy(et) + if err != nil { + panic(err) + } + return net.(*ExecutorTask) +} + +type ExecutorTaskSpec struct { + ExecutorID string `json:"executor_id,omitempty"` + RunID string `json:"run_id,omitempty"` + + // Stop is used to signal from the scheduler when the task must be stopped + Stop bool `json:"stop,omitempty"` + + *ExecutorTaskSpecData +} + +// ExecutorTaskSpecData defines the task data required to execute the tasks. These +// values are not saved in etcd to avoid exceeding the max etcd value size but +// are generated everytime they are sent to the executor +type ExecutorTaskSpecData struct { TaskName string `json:"task_name,omitempty"` Arch types.Arch `json:"arch,omitempty"` Containers []*Container `json:"containers,omitempty"` @@ -471,27 +500,24 @@ type ExecutorTask struct { User string `json:"user,omitempty"` Privileged bool `json:"privileged"` - DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"` - - Steps Steps `json:"steps,omitempty"` - - Status ExecutorTaskStatus `json:"status,omitempty"` - SetupError string `fail_reason:"setup_error,omitempty"` - FailError string `fail_reason:"fail_error,omitempty"` - WorkspaceOperations []WorkspaceOperation `json:"workspace_operations,omitempty"` + DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"` + // Cache prefix to use when asking for a cache key. To isolate caches between // groups (projects) CachePrefix string `json:"cache_prefix,omitempty"` - // Stop is used to signal from the scheduler when the task must be stopped - Stop bool `json:"stop,omitempty"` + Steps Steps `json:"steps,omitempty"` } type ExecutorTaskStatus struct { - ExecutorID string `json:"executor_id,omitempty"` - Phase ExecutorTaskPhase `json:"phase,omitempty"` + ID string `json:"id,omitempty"` + Revision int64 `json:"revision,omitempty"` + + Phase ExecutorTaskPhase `json:"phase,omitempty"` + + FailError string `json:"fail_error,omitempty"` SetupStep ExecutorTaskStepStatus `json:"setup_step,omitempty"` Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"`