Merge pull request #113 from sgotti/runservice_dont_save_executor_data_etcd

runservice: don't save executor task data in etcd
This commit is contained in:
Simone Gotti 2019-09-17 14:35:01 +02:00 committed by GitHub
commit 25c5002782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 349 additions and 230 deletions

View File

@ -67,9 +67,9 @@ func (e *Executor) getAllPods(ctx context.Context, all bool) ([]driver.Pod, erro
func stepUser(t *types.ExecutorTask) string {
// use the container specified user and override with task user if defined
user := t.Containers[0].User
if t.User != "" {
user = t.User
user := t.Spec.Containers[0].User
if t.Spec.User != "" {
user = t.Spec.User
}
return user
@ -122,8 +122,8 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe
// TODO(sgotti) this line is used only for old runconfig versions that don't
// set a task default shell in the runconfig
shell := defaultShell
if t.Shell != "" {
shell = t.Shell
if t.Spec.Shell != "" {
shell = t.Spec.Shell
}
if s.Shell != "" {
shell = s.Shell
@ -143,14 +143,14 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe
}
// override task working dir with runstep working dir if provided
workingDir := t.WorkingDir
workingDir := t.Spec.WorkingDir
if s.WorkingDir != "" {
workingDir = s.WorkingDir
}
// generate the environment using the task environment and then overriding with the runstep environment
environment := map[string]string{}
for envName, envValue := range t.Environment {
for envName, envValue := range t.Spec.Environment {
environment[envName] = envValue
}
for envName, envValue := range s.Environment {
@ -208,15 +208,15 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor
}
defer archivef.Close()
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir)
workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir)
if err != nil {
_, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
_, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err))
return -1, err
}
execConfig := &driver.ExecConfig{
Cmd: cmd,
Env: t.Environment,
Env: t.Spec.Environment,
WorkingDir: workingDir,
User: stepUser(t),
AttachStdin: true,
@ -278,7 +278,7 @@ func (e *Executor) expandDir(ctx context.Context, t *types.ExecutorTask, pod dri
execConfig := &driver.ExecConfig{
Cmd: cmd,
Env: t.Environment,
Env: t.Spec.Environment,
User: stepUser(t),
AttachStdin: true,
Stdout: stdout,
@ -307,7 +307,7 @@ func (e *Executor) mkdir(ctx context.Context, t *types.ExecutorTask, pod driver.
execConfig := &driver.ExecConfig{
Cmd: cmd,
Env: t.Environment,
Env: t.Spec.Environment,
User: stepUser(t),
AttachStdin: true,
Stdout: logf,
@ -336,15 +336,15 @@ func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driv
// limit the template answer to max 1MiB
stdout := util.NewLimitedBuffer(1024 * 1024)
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir)
workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir)
if err != nil {
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err))
return "", err
}
execConfig := &driver.ExecConfig{
Cmd: cmd,
Env: t.Environment,
Env: t.Spec.Environment,
WorkingDir: workingDir,
User: stepUser(t),
AttachStdin: true,
@ -384,15 +384,15 @@ func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source
}
cmd := append([]string{toolboxContainerPath, "unarchive"}, args...)
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir)
workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir)
if err != nil {
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err))
return err
}
execConfig := &driver.ExecConfig{
Cmd: cmd,
Env: t.Environment,
Env: t.Spec.Environment,
WorkingDir: workingDir,
User: stepUser(t),
AttachStdin: true,
@ -432,7 +432,7 @@ func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreW
}
defer logf.Close()
for _, op := range t.WorkspaceOperations {
for _, op := range t.Spec.WorkspaceOperations {
log.Debugf("unarchiving workspace for taskID: %s, step: %d", level, op.TaskID, op.Step)
resp, err := e.runserviceClient.GetArchive(ctx, op.TaskID, op.Step)
if err != nil {
@ -473,7 +473,7 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep,
fmt.Fprintf(logf, "cache key %q\n", userKey)
// append cache prefix
key := t.CachePrefix + "-" + userKey
key := t.Spec.CachePrefix + "-" + userKey
// check that the cache key doesn't already exists
resp, err := e.runserviceClient.CheckCache(ctx, key, false)
@ -503,15 +503,15 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep,
}
defer archivef.Close()
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir)
workingDir, err := e.expandDir(ctx, t, pod, logf, t.Spec.WorkingDir)
if err != nil {
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
_, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.Spec.WorkingDir, err))
return -1, err
}
execConfig := &driver.ExecConfig{
Cmd: cmd,
Env: t.Environment,
Env: t.Spec.Environment,
WorkingDir: workingDir,
User: stepUser(t),
AttachStdin: true,
@ -605,7 +605,7 @@ func (e *Executor) doRestoreCacheStep(ctx context.Context, s *types.RestoreCache
fmt.Fprintf(logf, "cache key %q\n", userKey)
// append cache prefix
key := t.CachePrefix + "-" + userKey
key := t.Spec.CachePrefix + "-" + userKey
resp, err := e.runserviceClient.GetCache(ctx, key, true)
if err != nil {
@ -855,7 +855,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
// error out if privileged containers are required but not allowed
requiresPrivilegedContainers := false
for _, c := range et.Containers {
for _, c := range et.Spec.Containers {
if c.Privileged {
requiresPrivilegedContainers = true
break
@ -868,7 +868,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
log.Debugf("starting pod")
dockerConfig, err := registry.GenDockerConfig(et.DockerRegistriesAuth, []string{et.Containers[0].Image})
dockerConfig, err := registry.GenDockerConfig(et.Spec.DockerRegistriesAuth, []string{et.Spec.Containers[0].Image})
if err != nil {
return err
}
@ -878,12 +878,12 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
// tasks failed to start and don't clash with existing pods)
ID: uuid.NewV4().String(),
TaskID: et.ID,
Arch: et.Arch,
Arch: et.Spec.Arch,
InitVolumeDir: toolboxContainerDir,
DockerConfig: dockerConfig,
Containers: make([]*driver.ContainerConfig, len(et.Containers)),
Containers: make([]*driver.ContainerConfig, len(et.Spec.Containers)),
}
for i, c := range et.Containers {
for i, c := range et.Spec.Containers {
var cmd []string
if i == 0 {
cmd = []string{toolboxContainerPath, "sleeper"}
@ -909,10 +909,10 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
}
_, _ = outf.WriteString("Pod started.\n")
if et.WorkingDir != "" {
_, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.WorkingDir))
if err := e.mkdir(ctx, et, pod, outf, et.WorkingDir); err != nil {
_, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.WorkingDir, err))
if et.Spec.WorkingDir != "" {
_, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.Spec.WorkingDir))
if err := e.mkdir(ctx, et, pod, outf, et.Spec.WorkingDir); err != nil {
_, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.Spec.WorkingDir, err))
return err
}
}
@ -922,7 +922,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
}
func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod driver.Pod) (int, error) {
for i, step := range rt.et.Steps {
for i, step := range rt.et.Spec.Steps {
rt.Lock()
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseRunning
rt.et.Status.Steps[i].StartTime = util.TimeP(time.Now())
@ -975,7 +975,7 @@ func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod dr
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseSuccess
if err != nil {
if rt.et.Stop {
if rt.et.Spec.Stop {
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped
} else {
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed
@ -1154,11 +1154,11 @@ func (e *Executor) tasksUpdater(ctx context.Context) error {
func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
log.Debugf("et: %v", util.Dump(et))
if et.Status.ExecutorID != e.id {
if et.Spec.ExecutorID != e.id {
return
}
if et.Stop {
if et.Spec.Stop {
e.stopTask(ctx, et)
}

View File

@ -585,3 +585,58 @@ func (h *ActionHandler) getRunCounter(ctx context.Context, group string) (uint64
return c, cgt, nil
}
func (h *ActionHandler) GetExecutorTask(ctx context.Context, etID string) (*types.ExecutorTask, error) {
et, err := store.GetExecutorTask(ctx, h.e, etID)
if err != nil && err != etcd.ErrKeyNotFound {
return nil, err
}
if et == nil {
return nil, util.NewErrNotFound(errors.Errorf("executor task %q not found", etID))
}
r, _, err := store.GetRun(ctx, h.e, et.Spec.RunID)
if err != nil {
return nil, errors.Errorf("cannot get run %q: %w", et.Spec.RunID, err)
}
rc, err := store.OSTGetRunConfig(h.dm, r.ID)
if err != nil {
return nil, errors.Errorf("cannot get run config %q: %w", r.ID, err)
}
rt, ok := r.Tasks[et.ID]
if !ok {
return nil, errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID)
}
// generate ExecutorTaskSpecData
et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc)
return et, nil
}
func (h *ActionHandler) GetExecutorTasks(ctx context.Context, executorID string) ([]*types.ExecutorTask, error) {
ets, err := store.GetExecutorTasks(ctx, h.e, executorID)
if err != nil && err != etcd.ErrKeyNotFound {
return nil, err
}
for _, et := range ets {
r, _, err := store.GetRun(ctx, h.e, et.Spec.RunID)
if err != nil {
return nil, errors.Errorf("cannot get run %q: %w", et.Spec.RunID, err)
}
rc, err := store.OSTGetRunConfig(h.dm, r.ID)
if err != nil {
return nil, errors.Errorf("cannot get run config %q: %w", r.ID, err)
}
rt, ok := r.Tasks[et.ID]
if !ok {
return nil, errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID)
}
// generate ExecutorTaskSpecData
et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc)
}
return ets, nil
}

View File

@ -244,12 +244,12 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se
if err != nil {
return err, true
}
executor, err := store.GetExecutor(ctx, h.e, et.Status.ExecutorID)
executor, err := store.GetExecutor(ctx, h.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound {
return err, true
}
if executor == nil {
return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Status.ExecutorID)), true
return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Spec.ExecutorID)), true
}
var url string

View File

@ -29,7 +29,9 @@ import (
"agola.io/agola/internal/services/runservice/action"
"agola.io/agola/internal/services/runservice/common"
"agola.io/agola/internal/services/runservice/store"
"agola.io/agola/internal/util"
"agola.io/agola/services/runservice/types"
errors "golang.org/x/xerrors"
"github.com/gorilla/mux"
"go.uber.org/zap"
@ -137,11 +139,12 @@ func (h *ExecutorTaskStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Req
}
type ExecutorTaskHandler struct {
e *etcd.Store
log *zap.SugaredLogger
ah *action.ActionHandler
}
func NewExecutorTaskHandler(e *etcd.Store) *ExecutorTaskHandler {
return &ExecutorTaskHandler{e: e}
func NewExecutorTaskHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorTaskHandler {
return &ExecutorTaskHandler{log: logger.Sugar(), ah: ah}
}
func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -151,32 +154,28 @@ func (h *ExecutorTaskHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
// TODO(sgotti) Check authorized call from executors
etID := vars["taskid"]
if etID == "" {
http.Error(w, "", http.StatusBadRequest)
httpError(w, util.NewErrBadRequest(errors.Errorf("taskid is empty")))
return
}
et, err := store.GetExecutorTask(ctx, h.e, etID)
if err != nil && err != etcd.ErrKeyNotFound {
http.Error(w, "", http.StatusInternalServerError)
return
}
if et == nil {
http.Error(w, "", http.StatusNotFound)
et, err := h.ah.GetExecutorTask(ctx, etID)
if httpError(w, err) {
h.log.Errorf("err: %+v", err)
return
}
if err := json.NewEncoder(w).Encode(et); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
if err := httpResponse(w, http.StatusOK, et); err != nil {
h.log.Errorf("err: %+v", err)
}
}
type ExecutorTasksHandler struct {
e *etcd.Store
log *zap.SugaredLogger
ah *action.ActionHandler
}
func NewExecutorTasksHandler(e *etcd.Store) *ExecutorTasksHandler {
return &ExecutorTasksHandler{e: e}
func NewExecutorTasksHandler(logger *zap.Logger, ah *action.ActionHandler) *ExecutorTasksHandler {
return &ExecutorTasksHandler{log: logger.Sugar(), ah: ah}
}
func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -190,7 +189,7 @@ func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
return
}
ets, err := store.GetExecutorTasks(ctx, h.e, executorID)
ets, err := h.ah.GetExecutorTasks(ctx, executorID)
if err != nil {
http.Error(w, "", http.StatusInternalServerError)
return

View File

@ -15,7 +15,13 @@
package common
import (
"fmt"
"path"
"sort"
"agola.io/agola/internal/runconfig"
"agola.io/agola/internal/util"
"agola.io/agola/services/runservice/types"
)
const (
@ -81,3 +87,154 @@ const (
DataTypeRunConfig DataType = "runconfig"
DataTypeRunCounter DataType = "runcounter"
)
func OSTSubGroupsAndGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
return h
}
func OSTRootGroup(group string) string {
pl := util.PathList(group)
if len(pl) < 2 {
panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group))
}
return pl[1]
}
func OSTSubGroups(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
// remove group types
sg := []string{}
for i, g := range h {
if i%2 == 0 {
sg = append(sg, g)
}
}
return sg
}
func OSTSubGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
// remove group names
sg := []string{}
for i, g := range h {
if i%2 == 1 {
sg = append(sg, g)
}
}
return sg
}
type parentsByLevelName []*types.RunConfigTask
func (p parentsByLevelName) Len() int { return len(p) }
func (p parentsByLevelName) Less(i, j int) bool {
if p[i].Level != p[j].Level {
return p[i].Level < p[j].Level
}
return p[i].Name < p[j].Name
}
func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func mergeEnv(dest, src map[string]string) {
for k, v := range src {
dest[k] = v
}
}
func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfig) *types.ExecutorTaskSpecData {
rct := rc.Tasks[rt.ID]
environment := map[string]string{}
if rct.Environment != nil {
environment = rct.Environment
}
mergeEnv(environment, rc.StaticEnvironment)
// run config Environment variables ovverride every other environment variable
mergeEnv(environment, rc.Environment)
cachePrefix := OSTRootGroup(r.Group)
if rc.CacheGroup != "" {
cachePrefix = rc.CacheGroup
}
data := &types.ExecutorTaskSpecData{
// The executorTask ID must be the same as the runTask ID so we can detect if
// there's already an executorTask scheduled for that run task and we can get
// at most once task execution
TaskName: rct.Name,
Arch: rct.Runtime.Arch,
Containers: rct.Runtime.Containers,
Environment: environment,
WorkingDir: rct.WorkingDir,
Shell: rct.Shell,
User: rct.User,
Steps: rct.Steps,
CachePrefix: cachePrefix,
DockerRegistriesAuth: rct.DockerRegistriesAuth,
}
// calculate workspace operations
// TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer.
// this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility
wsops := []types.WorkspaceOperation{}
rctAllParents := runconfig.GetAllParents(rc.Tasks, rct)
// sort parents by level and name just for reproducibility
sort.Sort(parentsByLevelName(rctAllParents))
for _, rctParent := range rctAllParents {
for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives {
wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep}
wsops = append(wsops, wsop)
}
}
data.WorkspaceOperations = wsops
return data
}
func GenExecutorTask(r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask {
rct := rc.Tasks[rt.ID]
et := &types.ExecutorTask{
// The executorTask ID must be the same as the runTask ID so we can detect if
// there's already an executorTask scheduled for that run task and we can get
// at most once task execution
ID: rt.ID,
Spec: types.ExecutorTaskSpec{
ExecutorID: executor.ID,
RunID: r.ID,
// ExecutorTaskSpecData is not saved in etcd to avoid exceeding the max etcd value
// size but is generated everytime the executor task is sent to the executor
},
Status: types.ExecutorTaskStatus{
Phase: types.ExecutorTaskPhaseNotStarted,
Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)),
},
}
for i := range et.Status.Steps {
et.Status.Steps[i] = &types.ExecutorTaskStepStatus{
Phase: types.ExecutorTaskPhaseNotStarted,
}
}
return et
}

View File

@ -211,8 +211,8 @@ func (s *Runservice) setupDefaultRouter(etCh chan *types.ExecutorTask) http.Hand
// executor dedicated api, only calls from executor should happen on these handlers
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah)
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, etCh)
executorTaskHandler := api.NewExecutorTaskHandler(s.e)
executorTasksHandler := api.NewExecutorTasksHandler(s.e)
executorTaskHandler := api.NewExecutorTaskHandler(logger, s.ah)
executorTasksHandler := api.NewExecutorTasksHandler(logger, s.ah)
archivesHandler := api.NewArchivesHandler(logger, s.ost)
cacheHandler := api.NewCacheHandler(logger, s.ost)
cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost)

View File

@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"sort"
"strconv"
"time"
@ -52,12 +51,6 @@ var level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
var logger = slog.New(level)
var log = logger.Sugar()
func mergeEnv(dest, src map[string]string) {
for k, v := range src {
dest[k] = v
}
}
func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) {
// the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing
@ -255,7 +248,7 @@ func (s *Runservice) submitRunTasks(ctx context.Context, r *types.Run, rc *types
return nil
}
et := s.genExecutorTask(ctx, r, rt, rc, executor)
et := common.GenExecutorTask(r, rt, rc, executor)
log.Debugf("et: %s", util.Dump(et))
// check that the executorTask wasn't already scheduled
@ -333,107 +326,48 @@ func chooseExecutor(executors []*types.Executor, rct *types.RunConfigTask) *type
return nil
}
type parentsByLevelName []*types.RunConfigTask
func (p parentsByLevelName) Len() int { return len(p) }
func (p parentsByLevelName) Less(i, j int) bool {
if p[i].Level != p[j].Level {
return p[i].Level < p[j].Level
}
return p[i].Name < p[j].Name
}
func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Runservice) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask {
rct := rc.Tasks[rt.ID]
environment := map[string]string{}
if rct.Environment != nil {
environment = rct.Environment
}
mergeEnv(environment, rc.StaticEnvironment)
// run config Environment variables ovverride every other environment variable
mergeEnv(environment, rc.Environment)
cachePrefix := store.OSTRootGroup(r.Group)
if rc.CacheGroup != "" {
cachePrefix = rc.CacheGroup
}
et := &types.ExecutorTask{
// The executorTask ID must be the same as the runTask ID so we can detect if
// there's already an executorTask scheduled for that run task and we can get
// at most once task execution
ID: rt.ID,
RunID: r.ID,
TaskName: rct.Name,
Arch: rct.Runtime.Arch,
Containers: rct.Runtime.Containers,
Environment: environment,
WorkingDir: rct.WorkingDir,
Shell: rct.Shell,
User: rct.User,
Steps: rct.Steps,
CachePrefix: cachePrefix,
Status: types.ExecutorTaskStatus{
Phase: types.ExecutorTaskPhaseNotStarted,
Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)),
ExecutorID: executor.ID,
},
DockerRegistriesAuth: rct.DockerRegistriesAuth,
}
for i := range et.Status.Steps {
et.Status.Steps[i] = &types.ExecutorTaskStepStatus{
Phase: types.ExecutorTaskPhaseNotStarted,
}
}
// calculate workspace operations
// TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer.
// this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility
wsops := []types.WorkspaceOperation{}
rctAllParents := runconfig.GetAllParents(rc.Tasks, rct)
// sort parents by level and name just for reproducibility
sort.Sort(parentsByLevelName(rctAllParents))
for _, rctParent := range rctAllParents {
log.Debugf("rctParent: %s", util.Dump(rctParent))
for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives {
wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep}
wsops = append(wsops, wsop)
}
}
et.WorkspaceOperations = wsops
return et
}
// sendExecutorTask sends executor task to executor, if this fails the executor
// will periodically fetch the executortask anyway
func (s *Runservice) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error {
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID)
executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound {
return err
}
if executor == nil {
log.Warnf("executor with id %q doesn't exist", et.Status.ExecutorID)
log.Warnf("executor with id %q doesn't exist", et.Spec.ExecutorID)
return nil
}
r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID)
if err != nil {
return err
}
rc, err := store.OSTGetRunConfig(s.dm, r.ID)
if err != nil {
return errors.Errorf("cannot get run config %q: %w", r.ID, err)
}
rt, ok := r.Tasks[et.ID]
if !ok {
return errors.Errorf("no such run task with id %s for run %s", et.ID, r.ID)
}
// take a copy to not change the input executorTask
et = et.DeepCopy()
// generate ExecutorTaskSpecData
et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc)
etj, err := json.Marshal(et)
if err != nil {
return err
}
r, err := http.Post(executor.ListenURL+"/api/v1alpha/executor", "", bytes.NewReader(etj))
req, err := http.Post(executor.ListenURL+"/api/v1alpha/executor", "", bytes.NewReader(etj))
if err != nil {
return err
}
if r.StatusCode != http.StatusOK {
return errors.Errorf("received http status: %d", r.StatusCode)
if req.StatusCode != http.StatusOK {
return errors.Errorf("received http status: %d", req.StatusCode)
}
return nil
@ -549,7 +483,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
// if the run is set to stop, stop all tasks
if r.Stop {
for _, et := range activeExecutorTasks {
et.Stop = true
et.Spec.Stop = true
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err
}
@ -664,7 +598,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx
}
func (s *Runservice) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error {
r, _, err := store.GetRun(ctx, s.e, et.RunID)
r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID)
if err != nil {
return err
}
@ -819,7 +753,7 @@ func (s *Runservice) executorTasksCleaner(ctx context.Context) error {
func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error {
log.Debugf("et: %s", util.Dump(et))
if et.Status.Phase.IsFinished() {
r, _, err := store.GetRun(ctx, s.e, et.RunID)
r, _, err := store.GetRun(ctx, s.e, et.Spec.RunID)
if err != nil {
if err == etcd.ErrKeyNotFound {
// run doesn't exists, remove executor task
@ -835,8 +769,8 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor
if r.Phase.IsFinished() {
// if the run is finished mark the executor tasks to stop
if !et.Stop {
et.Stop = true
if !et.Spec.Stop {
et.Spec.Stop = true
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err
}
@ -850,13 +784,13 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.Executor
if !et.Status.Phase.IsFinished() {
// if the executor doesn't exists anymore mark the not finished executor tasks as failed
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID)
executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound {
return err
}
if executor == nil {
log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Status.ExecutorID, et.ID)
et.FailError = "executor deleted"
log.Warnf("executor with id %q doesn't exist. marking executor task %q as failed", et.Spec.ExecutorID, et.ID)
et.Status.FailError = "executor deleted"
et.Status.Phase = types.ExecutorTaskPhaseFailed
et.Status.EndTime = util.TimeP(time.Now())
for _, s := range et.Status.Steps {
@ -947,12 +881,12 @@ func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool
}
return nil
}
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID)
executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound {
return err
}
if executor == nil {
log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Status.ExecutorID)
log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Spec.ExecutorID)
return nil
}
@ -1107,12 +1041,12 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu
log.Errorf("executor task with id %q doesn't exist. This shouldn't happen. Skipping fetching", rt.ID)
return nil
}
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID)
executor, err := store.GetExecutor(ctx, s.e, et.Spec.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound {
return err
}
if executor == nil {
log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Status.ExecutorID)
log.Warnf("executor with id %q doesn't exist. Skipping fetching", et.Spec.ExecutorID)
return nil
}

View File

@ -37,58 +37,6 @@ const (
MaxChangegroupNameLength = 256
)
func OSTSubGroupsAndGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
return h
}
func OSTRootGroup(group string) string {
pl := util.PathList(group)
if len(pl) < 2 {
panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group))
}
return pl[1]
}
func OSTSubGroups(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
// remove group types
sg := []string{}
for i, g := range h {
if i%2 == 0 {
sg = append(sg, g)
}
}
return sg
}
func OSTSubGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
// remove group names
sg := []string{}
for i, g := range h {
if i%2 == 1 {
sg = append(sg, g)
}
}
return sg
}
func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*datamanager.Action, error) {
// use the first group dir after the root
pl := util.PathList(group)
@ -364,7 +312,7 @@ func GetExecutorTasks(ctx context.Context, e *etcd.Store, executorID string) ([]
return nil, err
}
et.Revision = kv.ModRevision
if et.Status.ExecutorID == executorID {
if et.Spec.ExecutorID == executorID {
ets = append(ets, et)
}
}

View File

@ -459,9 +459,38 @@ func (s ExecutorTaskPhase) IsFinished() bool {
}
type ExecutorTask struct {
Revision int64 `json:"revision,omitempty"`
ID string `json:"id,omitempty"`
Spec ExecutorTaskSpec `json:"spec,omitempty"`
Status ExecutorTaskStatus `json:"status,omitempty"`
// internal values not saved
Revision int64 `json:"-"`
}
func (et *ExecutorTask) DeepCopy() *ExecutorTask {
net, err := copystructure.Copy(et)
if err != nil {
panic(err)
}
return net.(*ExecutorTask)
}
type ExecutorTaskSpec struct {
ExecutorID string `json:"executor_id,omitempty"`
RunID string `json:"run_id,omitempty"`
// Stop is used to signal from the scheduler when the task must be stopped
Stop bool `json:"stop,omitempty"`
*ExecutorTaskSpecData
}
// ExecutorTaskSpecData defines the task data required to execute the tasks. These
// values are not saved in etcd to avoid exceeding the max etcd value size but
// are generated everytime they are sent to the executor
type ExecutorTaskSpecData struct {
TaskName string `json:"task_name,omitempty"`
Arch types.Arch `json:"arch,omitempty"`
Containers []*Container `json:"containers,omitempty"`
@ -471,28 +500,25 @@ type ExecutorTask struct {
User string `json:"user,omitempty"`
Privileged bool `json:"privileged"`
DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"`
Steps Steps `json:"steps,omitempty"`
Status ExecutorTaskStatus `json:"status,omitempty"`
SetupError string `fail_reason:"setup_error,omitempty"`
FailError string `fail_reason:"fail_error,omitempty"`
WorkspaceOperations []WorkspaceOperation `json:"workspace_operations,omitempty"`
DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"`
// Cache prefix to use when asking for a cache key. To isolate caches between
// groups (projects)
CachePrefix string `json:"cache_prefix,omitempty"`
// Stop is used to signal from the scheduler when the task must be stopped
Stop bool `json:"stop,omitempty"`
Steps Steps `json:"steps,omitempty"`
}
type ExecutorTaskStatus struct {
ExecutorID string `json:"executor_id,omitempty"`
ID string `json:"id,omitempty"`
Revision int64 `json:"revision,omitempty"`
Phase ExecutorTaskPhase `json:"phase,omitempty"`
FailError string `json:"fail_error,omitempty"`
SetupStep ExecutorTaskStepStatus `json:"setup_step,omitempty"`
Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"`