runservice: don't save executor task data in etcd

Reorganize ExecutorTask to better distinguish between the task Spec and
the Status.

Split the task Spec in a sub part called ExecutorTaskSpecData that contains
tasks data that don't have to be saved in etcd because it contains data that can
be very big and can be generated starting from the run and the runconfig.
This commit is contained in:
Simone Gotti 2019-09-12 10:36:45 +02:00
parent 947be9a742
commit 12b02143b2
9 changed files with 349 additions and 230 deletions

View File

@ -67,9 +67,9 @@ func (e *Executor) getAllPods(ctx context.Context, all bool) ([]driver.Pod, erro
func stepUser(t *types.ExecutorTask) string { func stepUser(t *types.ExecutorTask) string {
// use the container specified user and override with task user if defined // use the container specified user and override with task user if defined
user := t.Containers[0].User user := t.Spec.Containers[0].User
if t.User != "" { if t.Spec.User != "" {
user = t.User user = t.Spec.User
} }
return user return user
@ -122,8 +122,8 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe
// TODO(sgotti) this line is used only for old runconfig versions that don't // TODO(sgotti) this line is used only for old runconfig versions that don't
// set a task default shell in the runconfig // set a task default shell in the runconfig
shell := defaultShell shell := defaultShell
if t.Shell != "" { if t.Spec.Shell != "" {
shell = t.Shell shell = t.Spec.Shell
} }
if s.Shell != "" { if s.Shell != "" {
shell = s.Shell shell = s.Shell
@ -143,14 +143,14 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe
} }
// override task working dir with runstep working dir if provided // override task working dir with runstep working dir if provided
workingDir := t.WorkingDir workingDir := t.Spec.WorkingDir
if s.WorkingDir != "" { if s.WorkingDir != "" {
workingDir = s.WorkingDir workingDir = s.WorkingDir
} }
// generate the environment using the task environment and then overriding with the runstep environment // generate the environment using the task environment and then overriding with the runstep environment
environment := map[string]string{} environment := map[string]string{}
for envName, envValue := range t.Environment { for envName, envValue := range t.Spec.Environment {
environment[envName] = envValue environment[envName] = envValue
} }
for envName, envValue := range s.Environment { for envName, envValue := range s.Environment {
@ -208,15 +208,15 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor
} }
defer archivef.Close() defer archivef.Close()
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir)
if err != nil { if err != nil {
_, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) _, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err))
return -1, err return -1, err
} }
execConfig := &driver.ExecConfig{ execConfig := &driver.ExecConfig{
Cmd: cmd, Cmd: cmd,
Env: t.Environment, Env: t.Spec.Environment,
WorkingDir: workingDir, WorkingDir: workingDir,
User: stepUser(t), User: stepUser(t),
AttachStdin: true, AttachStdin: true,
@ -278,7 +278,7 @@ func (e *Executor) expandDir(ctx context.Context, t *types.ExecutorTask, pod dri
execConfig := &driver.ExecConfig{ execConfig := &driver.ExecConfig{
Cmd: cmd, Cmd: cmd,
Env: t.Environment, Env: t.Spec.Environment,
User: stepUser(t), User: stepUser(t),
AttachStdin: true, AttachStdin: true,
Stdout: stdout, Stdout: stdout,
@ -307,7 +307,7 @@ func (e *Executor) mkdir(ctx context.Context, t *types.ExecutorTask, pod driver.
execConfig := &driver.ExecConfig{ execConfig := &driver.ExecConfig{
Cmd: cmd, Cmd: cmd,
Env: t.Environment, Env: t.Spec.Environment,
User: stepUser(t), User: stepUser(t),
AttachStdin: true, AttachStdin: true,
Stdout: logf, Stdout: logf,
@ -336,15 +336,15 @@ func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driv
// limit the template answer to max 1MiB // limit the template answer to max 1MiB
stdout := util.NewLimitedBuffer(1024 * 1024) stdout := util.NewLimitedBuffer(1024 * 1024)
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir)
if err != nil { if err != nil {
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err))
return "", err return "", err
} }
execConfig := &driver.ExecConfig{ execConfig := &driver.ExecConfig{
Cmd: cmd, Cmd: cmd,
Env: t.Environment, Env: t.Spec.Environment,
WorkingDir: workingDir, WorkingDir: workingDir,
User: stepUser(t), User: stepUser(t),
AttachStdin: true, AttachStdin: true,
@ -384,15 +384,15 @@ func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source
} }
cmd := append([]string{toolboxContainerPath, "unarchive"}, args...) cmd := append([]string{toolboxContainerPath, "unarchive"}, args...)
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir)
if err != nil { if err != nil {
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err))
return err return err
} }
execConfig := &driver.ExecConfig{ execConfig := &driver.ExecConfig{
Cmd: cmd, Cmd: cmd,
Env: t.Environment, Env: t.Spec.Environment,
WorkingDir: workingDir, WorkingDir: workingDir,
User: stepUser(t), User: stepUser(t),
AttachStdin: true, AttachStdin: true,
@ -432,7 +432,7 @@ func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreW
} }
defer logf.Close() defer logf.Close()
for _, op := range t.WorkspaceOperations { for _, op := range t.Spec.WorkspaceOperations {
log.Debugf("unarchiving workspace for taskID: %s, step: %d", level, op.TaskID, op.Step) log.Debugf("unarchiving workspace for taskID: %s, step: %d", level, op.TaskID, op.Step)
resp, err := e.runserviceClient.GetArchive(ctx, op.TaskID, op.Step) resp, err := e.runserviceClient.GetArchive(ctx, op.TaskID, op.Step)
if err != nil { if err != nil {
@ -473,7 +473,7 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep,
fmt.Fprintf(logf, "cache key %q\n", userKey) fmt.Fprintf(logf, "cache key %q\n", userKey)
// append cache prefix // append cache prefix
key := t.CachePrefix + "-" + userKey key := t.Spec.CachePrefix + "-" + userKey
// check that the cache key doesn't already exists // check that the cache key doesn't already exists
resp, err := e.runserviceClient.CheckCache(ctx, key, false) resp, err := e.runserviceClient.CheckCache(ctx, key, false)
@ -503,15 +503,15 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep,
} }
defer archivef.Close() defer archivef.Close()
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir)
if err != nil { if err != nil {
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err))
return -1, err return -1, err
} }
execConfig := &driver.ExecConfig{ execConfig := &driver.ExecConfig{
Cmd: cmd, Cmd: cmd,
Env: t.Environment, Env: t.Spec.Environment,
WorkingDir: workingDir, WorkingDir: workingDir,
User: stepUser(t), User: stepUser(t),
AttachStdin: true, AttachStdin: true,
@ -605,7 +605,7 @@ func (e *Executor) doRestoreCacheStep(ctx context.Context, s *types.RestoreCache
fmt.Fprintf(logf, "cache key %q\n", userKey) fmt.Fprintf(logf, "cache key %q\n", userKey)
// append cache prefix // append cache prefix
key := t.CachePrefix + "-" + userKey key := t.Spec.CachePrefix + "-" + userKey
resp, err := e.runserviceClient.GetCache(ctx, key, true) resp, err := e.runserviceClient.GetCache(ctx, key, true)
if err != nil { if err != nil {
@ -855,7 +855,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
// error out if privileged containers are required but not allowed // error out if privileged containers are required but not allowed
requiresPrivilegedContainers := false requiresPrivilegedContainers := false
for _, c := range et.Containers { for _, c := range et.Spec.Containers {
if c.Privileged { if c.Privileged {
requiresPrivilegedContainers = true requiresPrivilegedContainers = true
break break
@ -868,7 +868,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
log.Debugf("starting pod") log.Debugf("starting pod")
dockerConfig, err := registry.GenDockerConfig(et.DockerRegistriesAuth, []string{et.Containers[0].Image}) dockerConfig, err := registry.GenDockerConfig(et.Spec.DockerRegistriesAuth, []string{et.Spec.Containers[0].Image})
if err != nil { if err != nil {
return err return err
} }
@ -878,12 +878,12 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
// tasks failed to start and don't clash with existing pods) // tasks failed to start and don't clash with existing pods)
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
TaskID: et.ID, TaskID: et.ID,
Arch: et.Arch, Arch: et.Spec.Arch,
InitVolumeDir: toolboxContainerDir, InitVolumeDir: toolboxContainerDir,
DockerConfig: dockerConfig, DockerConfig: dockerConfig,
Containers: make([]*driver.ContainerConfig, len(et.Containers)), Containers: make([]*driver.ContainerConfig, len(et.Spec.Containers)),
} }
for i, c := range et.Containers { for i, c := range et.Spec.Containers {
var cmd []string var cmd []string
if i == 0 { if i == 0 {
cmd = []string{toolboxContainerPath, "sleeper"} cmd = []string{toolboxContainerPath, "sleeper"}
@ -909,10 +909,10 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
} }
_, _ = outf.WriteString("Pod started.\n") _, _ = outf.WriteString("Pod started.\n")
if et.WorkingDir != "" { if et.Spec.WorkingDir != "" {
_, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.WorkingDir)) _, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.Spec.WorkingDir))
if err := e.mkdir(ctx, et, pod, outf, et.WorkingDir); err != nil { if err := e.mkdir(ctx, et, pod, outf, et.Spec.WorkingDir); err != nil {
_, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.WorkingDir, err)) _, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.Spec.WorkingDir, err))
return err return err
} }
} }
@ -922,7 +922,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
} }
func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod driver.Pod) (int, error) { func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod driver.Pod) (int, error) {
for i, step := range rt.et.Steps { for i, step := range rt.et.Spec.Steps {
rt.Lock() rt.Lock()
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseRunning rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseRunning
rt.et.Status.Steps[i].StartTime = util.TimeP(time.Now()) rt.et.Status.Steps[i].StartTime = util.TimeP(time.Now())
@ -975,7 +975,7 @@ func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod dr
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseSuccess rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseSuccess
if err != nil { if err != nil {
if rt.et.Stop { if rt.et.Spec.Stop {
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped
} else { } else {
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed
@ -1154,11 +1154,11 @@ func (e *Executor) tasksUpdater(ctx context.Context) error {
func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
log.Debugf("et: %v", util.Dump(et)) log.Debugf("et: %v", util.Dump(et))
if et.Status.ExecutorID != e.id { if et.Spec.ExecutorID != e.id {
return return
} }
if et.Stop { if et.Spec.Stop {
e.stopTask(ctx, et) e.stopTask(ctx, et)
} }

View File

@ -585,3 +585,58 @@ func (h *ActionHandler) getRunCounter(ctx context.Context, group string) (uint64
return c, cgt, nil return c, cgt, nil
} }
func (h *ActionHandler) GetExecutorTask(ctx context.Context, etID string) (*types.ExecutorTask, error) {
et, err := store.GetExecutorTask(ctx, h.e, etID)
if err != nil && err != etcd.ErrKeyNotFound {
return nil, err
}
if et == nil {
return nil, util.NewErrNotFound(errors.Errorf("executor task %q not found", etID))
}
r, _, err := store.GetRun(ctx, h.e, et.Spec.RunID)
if err != nil {
return nil, errors.Errorf("cannot get run %q: %w", et.Spec.RunID, err)
}
rc, err := store.OSTGetRunConfig(h.dm, r.ID)
if err != nil {
return nil, errors.Errorf("cannot get run config %q: %w", r.ID, err)
}
rt, ok := r.Tasks[et.ID]
if !ok {
return nil, errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID)
}
// generate ExecutorTaskSpecData
et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc)
return et, nil
}
func (h *ActionHandler) GetExecutorTasks(ctx context.Context, executorID string) ([]*types.ExecutorTask, error) {
ets, err := store.GetExecutorTasks(ctx, h.e, executorID)
if err != nil && err != etcd.ErrKeyNotFound {
return nil, err
}
for _, et := range ets {
r, _, err := store.GetRun(ctx, h.e, et.Spec.RunID)
if err != nil {
return nil, errors.Errorf("cannot get run %q: %w", et.Spec.RunID, err)
}
rc, err := store.OSTGetRunConfig(h.dm, r.ID)
if err != nil {
return nil, errors.Errorf("cannot get run config %q: %w", r.ID, err)
}
rt, ok := r.Tasks[et.ID]
if !ok {
return nil, errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID)
}
// generate ExecutorTaskSpecData
et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc)
}
return ets, nil
}

View File

@ -244,12 +244,12 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se
if err != nil { if err != nil {
return err, true return err, true
} }
executor, err := store.GetExecutor(ctx, h.e, et.Status.ExecutorID) executor, err := store.GetExecutor(ctx, h.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err, true return err, true
} }
if executor == nil { if executor == nil {
return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Status.ExecutorID)), true return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Spec.ExecutorID)), true
} }
var url string var url string

View File

@ -29,7 +29,9 @@ import (
"agola.io/agola/internal/services/runservice/action" "agola.io/agola/internal/services/runservice/action"
"agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/common"
"agola.io/agola/internal/services/runservice/store" "agola.io/agola/internal/services/runservice/store"
"agola.io/agola/internal/util"
"agola.io/agola/services/runservice/types" "agola.io/agola/services/runservice/types"
errors "golang.org/x/xerrors"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"go.uber.org/zap" "go.uber.org/zap"
@ -137,11 +139,12 @@ func (h *ExecutorTaskStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Req
} }
type ExecutorTaskHandler struct { type ExecutorTaskHandler struct {
e *etcd.Store log *zap.SugaredLogger
ah *action.ActionHandler
} }
func NewExecutorTaskHandler(e *etcd.Store) *ExecutorTaskHandler { func NewExecutorTaskHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorTaskHandler {
return &ExecutorTaskHandler{e: e} return &ExecutorTaskHandler{log: logger.Sugar(), ah: ah}
} }
func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -151,32 +154,28 @@ func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
// TODO(sgotti) Check authorized call from executors // TODO(sgotti) Check authorized call from executors
etID := vars["taskid"] etID := vars["taskid"]
if etID == "" { if etID == "" {
http.Error(w, "", http.StatusBadRequest) httpError(w, util.NewErrBadRequest(errors.Errorf("taskid is empty")))
return return
} }
et, err := store.GetExecutorTask(ctx, h.e, etID) et, err := h.ah.GetExecutorTask(ctx, etID)
if err != nil && err != etcd.ErrKeyNotFound { if httpError(w, err) {
http.Error(w, "", http.StatusInternalServerError) h.log.Errorf("err: %+v", err)
return
}
if et == nil {
http.Error(w, "", http.StatusNotFound)
return return
} }
if err := json.NewEncoder(w).Encode(et); err != nil { if err := httpResponse(w, http.StatusOK, et); err != nil {
http.Error(w, "", http.StatusInternalServerError) h.log.Errorf("err: %+v", err)
return
} }
} }
type ExecutorTasksHandler struct { type ExecutorTasksHandler struct {
e *etcd.Store log *zap.SugaredLogger
ah *action.ActionHandler
} }
func NewExecutorTasksHandler(e *etcd.Store) *ExecutorTasksHandler { func NewExecutorTasksHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorTasksHandler {
return &ExecutorTasksHandler{e: e} return &ExecutorTasksHandler{log: logger.Sugar(), ah: ah}
} }
func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -190,7 +189,7 @@ func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
return return
} }
ets, err := store.GetExecutorTasks(ctx, h.e, executorID) ets, err := h.ah.GetExecutorTasks(ctx, executorID)
if err != nil { if err != nil {
http.Error(w, "", http.StatusInternalServerError) http.Error(w, "", http.StatusInternalServerError)
return return

View File

@ -15,7 +15,13 @@
package common package common
import ( import (
"fmt"
"path" "path"
"sort"
"agola.io/agola/internal/runconfig"
"agola.io/agola/internal/util"
"agola.io/agola/services/runservice/types"
) )
const ( const (
@ -81,3 +87,154 @@ const (
DataTypeRunConfig DataType = "runconfig" DataTypeRunConfig DataType = "runconfig"
DataTypeRunCounter DataType = "runcounter" DataTypeRunCounter DataType = "runcounter"
) )
func OSTSubGroupsAndGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
return h
}
func OSTRootGroup(group string) string {
pl := util.PathList(group)
if len(pl) < 2 {
panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group))
}
return pl[1]
}
func OSTSubGroups(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
// remove group types
sg := []string{}
for i, g := range h {
if i%2 == 0 {
sg = append(sg, g)
}
}
return sg
}
func OSTSubGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
// remove group names
sg := []string{}
for i, g := range h {
if i%2 == 1 {
sg = append(sg, g)
}
}
return sg
}
type parentsByLevelName []*types.RunConfigTask
func (p parentsByLevelName) Len() int { return len(p) }
func (p parentsByLevelName) Less(i, j int) bool {
if p[i].Level != p[j].Level {
return p[i].Level < p[j].Level
}
return p[i].Name < p[j].Name
}
func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func mergeEnv(dest, src map[string]string) {
for k, v := range src {
dest[k] = v
}
}
func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfig) *types.ExecutorTaskSpecData {
rct := rc.Tasks[rt.ID]
environment := map[string]string{}
if rct.Environment != nil {
environment = rct.Environment
}
mergeEnv(environment, rc.StaticEnvironment)
// run config Environment variables ovverride every other environment variable
mergeEnv(environment, rc.Environment)
cachePrefix := OSTRootGroup(r.Group)
if rc.CacheGroup != "" {
cachePrefix = rc.CacheGroup
}
data := &types.ExecutorTaskSpecData{
// The executorTask ID must be the same as the runTask ID so we can detect if
// there's already an executorTask scheduled for that run task and we can get
// at most once task execution
TaskName: rct.Name,
Arch: rct.Runtime.Arch,
Containers: rct.Runtime.Containers,
Environment: environment,
WorkingDir: rct.WorkingDir,
Shell: rct.Shell,
User: rct.User,
Steps: rct.Steps,
CachePrefix: cachePrefix,
DockerRegistriesAuth: rct.DockerRegistriesAuth,
}
// calculate workspace operations
// TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer.
// this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility
wsops := []types.WorkspaceOperation{}
rctAllParents := runconfig.GetAllParents(rc.Tasks, rct)
// sort parents by level and name just for reproducibility
sort.Sort(parentsByLevelName(rctAllParents))
for _, rctParent := range rctAllParents {
for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives {
wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep}
wsops = append(wsops, wsop)
}
}
data.WorkspaceOperations = wsops
return data
}
func GenExecutorTask(r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask {
rct := rc.Tasks[rt.ID]
et := &types.ExecutorTask{
// The executorTask ID must be the same as the runTask ID so we can detect if
// there's already an executorTask scheduled for that run task and we can get
// at most once task execution
ID: rt.ID,
Spec: types.ExecutorTaskSpec{
ExecutorID: executor.ID,
RunID: r.ID,
// ExecutorTaskSpecData is not saved in etcd to avoid exceeding the max etcd value
// size but is generated everytime the executor task is sent to the executor
},
Status: types.ExecutorTaskStatus{
Phase: types.ExecutorTaskPhaseNotStarted,
Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)),
},
}
for i := range et.Status.Steps {
et.Status.Steps[i] = &types.ExecutorTaskStepStatus{
Phase: types.ExecutorTaskPhaseNotStarted,
}
}
return et
}

View File

@ -211,8 +211,8 @@ func (s *Runservice) setupDefaultRouter(etCh chan *types.ExecutorTask) http.Hand
// executor dedicated api, only calls from executor should happen on these handlers // executor dedicated api, only calls from executor should happen on these handlers
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah)
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, etCh) executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, etCh)
executorTaskHandler := api.NewExecutorTaskHandler(s.e) executorTaskHandler := api.NewExecutorTaskHandler(logger, s.ah)
executorTasksHandler := api.NewExecutorTasksHandler(s.e) executorTasksHandler := api.NewExecutorTasksHandler(logger, s.ah)
archivesHandler := api.NewArchivesHandler(logger, s.ost) archivesHandler := api.NewArchivesHandler(logger, s.ost)
cacheHandler := api.NewCacheHandler(logger, s.ost) cacheHandler := api.NewCacheHandler(logger, s.ost)
cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost)

View File

@ -20,7 +20,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"time" "time"
@ -52,12 +51,6 @@ var level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
var logger = slog.New(level) var logger = slog.New(level)
var log = logger.Sugar() var log = logger.Sugar()
func mergeEnv(dest, src map[string]string) {
for k, v := range src {
dest[k] = v
}
}
func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) {
// the real source of active tasks is the number of executor tasks in etcd // the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing // we can't rely on RunTask.Status since it's only updated when receiveing
@ -255,7 +248,7 @@ func (s *Runservice) submitRunTasks(ctx context.Context, r *types.Run, rc *types
return nil return nil
} }
et := s.genExecutorTask(ctx, r, rt, rc, executor) et := common.GenExecutorTask(r, rt, rc, executor)
log.Debugf("et: %s", util.Dump(et)) log.Debugf("et: %s", util.Dump(et))
// check that the executorTask wasn't already scheduled // check that the executorTask wasn't already scheduled
@ -333,107 +326,48 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type
return nil return nil
} }
type parentsByLevelName []*types.RunConfigTask
func (p parentsByLevelName) Len() int { return len(p) }
func (p parentsByLevelName) Less(i, j int) bool {
if p[i].Level != p[j].Level {
return p[i].Level < p[j].Level
}
return p[i].Name < p[j].Name
}
func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Runservice) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask {
rct := rc.Tasks[rt.ID]
environment := map[string]string{}
if rct.Environment != nil {
environment = rct.Environment
}
mergeEnv(environment, rc.StaticEnvironment)
// run config Environment variables ovverride every other environment variable
mergeEnv(environment, rc.Environment)
cachePrefix := store.OSTRootGroup(r.Group)
if rc.CacheGroup != "" {
cachePrefix = rc.CacheGroup
}
et := &types.ExecutorTask{
// The executorTask ID must be the same as the runTask ID so we can detect if
// there's already an executorTask scheduled for that run task and we can get
// at most once task execution
ID: rt.ID,
RunID: r.ID,
TaskName: rct.Name,
Arch: rct.Runtime.Arch,
Containers: rct.Runtime.Containers,
Environment: environment,
WorkingDir: rct.WorkingDir,
Shell: rct.Shell,
User: rct.User,
Steps: rct.Steps,
CachePrefix: cachePrefix,
Status: types.ExecutorTaskStatus{
Phase: types.ExecutorTaskPhaseNotStarted,
Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)),
ExecutorID: executor.ID,
},
DockerRegistriesAuth: rct.DockerRegistriesAuth,
}
for i := range et.Status.Steps {
et.Status.Steps[i] = &types.ExecutorTaskStepStatus{
Phase: types.ExecutorTaskPhaseNotStarted,
}
}
// calculate workspace operations
// TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer.
// this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility
wsops := []types.WorkspaceOperation{}
rctAllParents := runconfig.GetAllParents(rc.Tasks, rct)
// sort parents by level and name just for reproducibility
sort.Sort(parentsByLevelName(rctAllParents))
for _, rctParent := range rctAllParents {
log.Debugf("rctParent: %s", util.Dump(rctParent))
for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives {
wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep}
wsops = append(wsops, wsop)
}
}
et.WorkspaceOperations = wsops
return et
}
// sendExecutorTask sends executor task to executor, if this fails the executor // sendExecutorTask sends executor task to executor, if this fails the executor
// will periodically fetch the executortask anyway // will periodically fetch the executortask anyway
func (s *Runservice) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { func (s *Runservice) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error {
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err return err
} }
if executor == nil { if executor == nil {
log.Warnf("executor with id %q doesn't exist", et.Status.ExecutorID) log.Warnf("executor with id %q doesn't exist", et.Spec.ExecutorID)
return nil return nil
} }
r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID)
if err != nil {
return err
}
rc, err := store.OSTGetRunConfig(s.dm, r.ID)
if err != nil {
return errors.Errorf("cannot get run config %q: %w", r.ID, err)
}
rt, ok := r.Tasks[et.ID]
if !ok {
return errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID)
}
// take a copy to not change the input executorTask
et = et.DeepCopy()
// generate ExecutorTaskSpecData
et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc)
etj, err := json.Marshal(et) etj, err := json.Marshal(et)
if err != nil { if err != nil {
return err return err
} }
r, err := http.Post(executor.ListenURL+"/api/v1alpha/executor", "", bytes.NewReader(etj)) req, err := http.Post(executor.ListenURL+"/api/v1alpha/executor", "", bytes.NewReader(etj))
if err != nil { if err != nil {
return err return err
} }
if r.StatusCode != http.StatusOK { if req.StatusCode != http.StatusOK {
return errors.Errorf("received http status: %d", r.StatusCode) return errors.Errorf("received http status: %d", req.StatusCode)
} }
return nil return nil
@ -549,7 +483,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
// if the run is set to stop, stop all tasks // if the run is set to stop, stop all tasks
if r.Stop { if r.Stop {
for _, et := range activeExecutorTasks { for _, et := range activeExecutorTasks {
et.Stop = true et.Spec.Stop = true
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err return err
} }
@ -664,7 +598,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx
} }
func (s *Runservice) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { func (s *Runservice) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error {
r, _, err := store.GetRun(ctx, s.e, et.RunID) r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID)
if err != nil { if err != nil {
return err return err
} }
@ -819,7 +753,7 @@ func (s *Runservice) executorTasksCleaner(ctx context.Context) error {
func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error { func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error {
log.Debugf("et: %s", util.Dump(et)) log.Debugf("et: %s", util.Dump(et))
if et.Status.Phase.IsFinished() { if et.Status.Phase.IsFinished() {
r, _, err := store.GetRun(ctx, s.e, et.RunID) r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID)
if err != nil { if err != nil {
if err == etcd.ErrKeyNotFound { if err == etcd.ErrKeyNotFound {
// run doesn't exists, remove executor task // run doesn't exists, remove executor task
@ -835,8 +769,8 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor
if r.Phase.IsFinished() { if r.Phase.IsFinished() {
// if the run is finished mark the executor tasks to stop // if the run is finished mark the executor tasks to stop
if !et.Stop { if !et.Spec.Stop {
et.Stop = true et.Spec.Stop = true
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil { if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err return err
} }
@ -850,13 +784,13 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor
if !et.Status.Phase.IsFinished() { if !et.Status.Phase.IsFinished() {
// if the executor doesn't exists anymore mark the not finished executor tasks as failed // if the executor doesn't exists anymore mark the not finished executor tasks as failed
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err return err
} }
if executor == nil { if executor == nil {
log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Status.ExecutorID, et.ID) log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Spec.ExecutorID, et.ID)
et.FailError = "executor deleted" et.Status.FailError = "executor deleted"
et.Status.Phase = types.ExecutorTaskPhaseFailed et.Status.Phase = types.ExecutorTaskPhaseFailed
et.Status.EndTime = util.TimeP(time.Now()) et.Status.EndTime = util.TimeP(time.Now())
for _, s := range et.Status.Steps { for _, s := range et.Status.Steps {
@ -947,12 +881,12 @@ func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool
} }
return nil return nil
} }
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err return err
} }
if executor == nil { if executor == nil {
log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Status.ExecutorID) log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Spec.ExecutorID)
return nil return nil
} }
@ -1107,12 +1041,12 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu
log.Errorf("executor task with id %q doesn't exist. This shouldn't happen. Skipping fetching", rt.ID) log.Errorf("executor task with id %q doesn't exist. This shouldn't happen. Skipping fetching", rt.ID)
return nil return nil
} }
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err return err
} }
if executor == nil { if executor == nil {
log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Status.ExecutorID) log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Spec.ExecutorID)
return nil return nil
} }

View File

@ -37,58 +37,6 @@ const (
MaxChangegroupNameLength = 256 MaxChangegroupNameLength = 256
) )
func OSTSubGroupsAndGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
return h
}
func OSTRootGroup(group string) string {
pl := util.PathList(group)
if len(pl) < 2 {
panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group))
}
return pl[1]
}
func OSTSubGroups(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
// remove group types
sg := []string{}
for i, g := range h {
if i%2 == 0 {
sg = append(sg, g)
}
}
return sg
}
func OSTSubGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
// remove group names
sg := []string{}
for i, g := range h {
if i%2 == 1 {
sg = append(sg, g)
}
}
return sg
}
func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*datamanager.Action, error) { func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*datamanager.Action, error) {
// use the first group dir after the root // use the first group dir after the root
pl := util.PathList(group) pl := util.PathList(group)
@ -364,7 +312,7 @@ func GetExecutorTasks(ctx context.Context, e *etcd.Store, executorID string) ([]
return nil, err return nil, err
} }
et.Revision = kv.ModRevision et.Revision = kv.ModRevision
if et.Status.ExecutorID == executorID { if et.Spec.ExecutorID == executorID {
ets = append(ets, et) ets = append(ets, et)
} }
} }

View File

@ -459,9 +459,38 @@ func (s ExecutorTaskPhase) IsFinished() bool {
} }
type ExecutorTask struct { type ExecutorTask struct {
Revision int64 `json:"revision,omitempty"`
ID string `json:"id,omitempty"` ID string `json:"id,omitempty"`
Spec ExecutorTaskSpec `json:"spec,omitempty"`
Status ExecutorTaskStatus `json:"status,omitempty"`
// internal values not saved
Revision int64 `json:"-"`
}
func (et *ExecutorTask) DeepCopy() *ExecutorTask {
net, err := copystructure.Copy(et)
if err != nil {
panic(err)
}
return net.(*ExecutorTask)
}
type ExecutorTaskSpec struct {
ExecutorID string `json:"executor_id,omitempty"`
RunID string `json:"run_id,omitempty"` RunID string `json:"run_id,omitempty"`
// Stop is used to signal from the scheduler when the task must be stopped
Stop bool `json:"stop,omitempty"`
*ExecutorTaskSpecData
}
// ExecutorTaskSpecData defines the task data required to execute the tasks. These
// values are not saved in etcd to avoid exceeding the max etcd value size but
// are generated everytime they are sent to the executor
type ExecutorTaskSpecData struct {
TaskName string `json:"task_name,omitempty"` TaskName string `json:"task_name,omitempty"`
Arch types.Arch `json:"arch,omitempty"` Arch types.Arch `json:"arch,omitempty"`
Containers []*Container `json:"containers,omitempty"` Containers []*Container `json:"containers,omitempty"`
@ -471,28 +500,25 @@ type ExecutorTask struct {
User string `json:"user,omitempty"` User string `json:"user,omitempty"`
Privileged bool `json:"privileged"` Privileged bool `json:"privileged"`
DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"`
Steps Steps `json:"steps,omitempty"`
Status ExecutorTaskStatus `json:"status,omitempty"`
SetupError string `fail_reason:"setup_error,omitempty"`
FailError string `fail_reason:"fail_error,omitempty"`
WorkspaceOperations []WorkspaceOperation `json:"workspace_operations,omitempty"` WorkspaceOperations []WorkspaceOperation `json:"workspace_operations,omitempty"`
DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"`
// Cache prefix to use when asking for a cache key. To isolate caches between // Cache prefix to use when asking for a cache key. To isolate caches between
// groups (projects) // groups (projects)
CachePrefix string `json:"cache_prefix,omitempty"` CachePrefix string `json:"cache_prefix,omitempty"`
// Stop is used to signal from the scheduler when the task must be stopped Steps Steps `json:"steps,omitempty"`
Stop bool `json:"stop,omitempty"`
} }
type ExecutorTaskStatus struct { type ExecutorTaskStatus struct {
ExecutorID string `json:"executor_id,omitempty"` ID string `json:"id,omitempty"`
Revision int64 `json:"revision,omitempty"`
Phase ExecutorTaskPhase `json:"phase,omitempty"` Phase ExecutorTaskPhase `json:"phase,omitempty"`
FailError string `json:"fail_error,omitempty"`
SetupStep ExecutorTaskStepStatus `json:"setup_step,omitempty"` SetupStep ExecutorTaskStepStatus `json:"setup_step,omitempty"`
Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"` Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"`