// Copyright 2019 Sorint.lab // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied // See the License for the specific language governing permissions and // limitations under the License. package executor import ( "bytes" "context" "encoding/json" "fmt" "io" "io/ioutil" "net" "net/http" "net/url" "os" "os/exec" "path/filepath" "strings" "sync" "time" uuid "github.com/satori/go.uuid" "github.com/sorintlab/agola/internal/common" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/runservice/executor/driver" rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" "github.com/gorilla/mux" sockaddr "github.com/hashicorp/go-sockaddr" "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) var logger = slog.New(level) var log = logger.Sugar() const ( defaultShell = "/bin/sh -e" taskIDLabel = "taskid" toolboxContainerDir = "/mnt/agola" ) var ( toolboxContainerPath = filepath.Join(toolboxContainerDir, "/agola-toolbox") ) func (e *Executor) getAllPods(ctx context.Context, all bool) ([]driver.Pod, error) { return e.driver.GetPodsByLabels(ctx, createAllLabels(), all) } func (e *Executor) createFile(ctx context.Context, pod driver.Pod, command, user string, outf io.Writer) (string, error) { cmd := []string{toolboxContainerPath, "createfile"} var buf bytes.Buffer execConfig := &driver.ExecConfig{ Cmd: cmd, Stdout: &buf, Stderr: outf, User: user, } ce, err := pod.Exec(ctx, execConfig) if err != nil { return "", err } stdin := ce.Stdin() go func() { io.WriteString(stdin, command) io.WriteString(stdin, "\n") stdin.Close() }() exitCode, err := ce.Wait(ctx) if err != nil { return "", err } if exitCode != 0 { return "", errors.Errorf("toolbox exited with code: %d", exitCode) } return buf.String(), nil } func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.ExecutorTask, pod driver.Pod, logPath string) (int, error) { if err := os.MkdirAll(filepath.Dir(logPath), 0770); err != nil { return -1, err } outf, err := os.Create(logPath) if err != nil { return -1, err } defer outf.Close() shell := defaultShell if t.Shell != "" { shell = t.Shell } if s.Shell != "" { shell = s.Shell } // try to use the container specified user user := t.Containers[0].User if t.User != "" { user = t.User } if s.User != "" { user = s.User } var cmd []string if s.Command != "" { filename, err := e.createFile(ctx, pod, s.Command, user, outf) if err != nil { return -1, errors.Errorf("create file err: %v", err) } args := strings.Split(shell, " ") cmd = append(args, filename) } else { cmd = strings.Split(shell, " ") } // override task working dir with runstep working dir if provided workingDir := t.WorkingDir if s.WorkingDir != "" { workingDir = s.WorkingDir } // generate the environment using the task environment and then overriding with the runstep environment environment := map[string]string{} for envName, envValue := range t.Environment { environment[envName] = envValue } for envName, envValue := range s.Environment { environment[envName] = envValue } execConfig := &driver.ExecConfig{ Cmd: cmd, Env: environment, WorkingDir: workingDir, User: user, Stdout: outf, Stderr: outf, Tty: true, } ce, err := pod.Exec(ctx, execConfig) if err != nil { return -1, err } exitCode, err := ce.Wait(ctx) if err != nil { return -1, err } return exitCode, nil } func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWorkspaceStep, t *types.ExecutorTask, pod driver.Pod, logPath string, archivePath string) (int, error) { cmd := []string{toolboxContainerPath, "archive"} if err := os.MkdirAll(filepath.Dir(logPath), 0770); err != nil { return -1, err } logf, err := os.Create(logPath) if err != nil { return -1, err } defer logf.Close() if err := os.MkdirAll(filepath.Dir(archivePath), 0770); err != nil { return -1, err } archivef, err := os.Create(archivePath) if err != nil { return -1, err } defer archivef.Close() execConfig := &driver.ExecConfig{ Cmd: cmd, Env: t.Environment, WorkingDir: t.WorkingDir, Stdout: archivef, Stderr: logf, } ce, err := pod.Exec(ctx, execConfig) if err != nil { return -1, err } type ArchiveInfo struct { SourceDir string DestDir string Paths []string } type Archive struct { ArchiveInfos []*ArchiveInfo OutFile string } a := &Archive{ OutFile: "", // use stdout ArchiveInfos: make([]*ArchiveInfo, len(s.Contents)), } for i, c := range s.Contents { a.ArchiveInfos[i] = &ArchiveInfo{ SourceDir: c.SourceDir, DestDir: c.DestDir, Paths: c.Paths, } } stdin := ce.Stdin() enc := json.NewEncoder(stdin) go func() { enc.Encode(a) stdin.Close() }() exitCode, err := ce.Wait(ctx) if err != nil { return -1, err } return exitCode, nil } func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source io.Reader, pod driver.Pod, logf io.Writer, destDir string, overwrite, removeDestDir bool) error { args := []string{"--destdir", destDir} if overwrite { args = append(args, "--overwrite") } if removeDestDir { args = append(args, "--remove-destdir") } cmd := append([]string{toolboxContainerPath, "unarchive"}, args...) execConfig := &driver.ExecConfig{ Cmd: cmd, Env: t.Environment, WorkingDir: t.WorkingDir, Stdout: logf, Stderr: logf, } ce, err := pod.Exec(ctx, execConfig) if err != nil { return err } stdin := ce.Stdin() go func() { io.Copy(stdin, source) stdin.Close() }() exitCode, err := ce.Wait(ctx) if err != nil { return err } if exitCode != 0 { return errors.Errorf("unarchive ended with exit code %d", exitCode) } return nil } func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreWorkspaceStep, t *types.ExecutorTask, pod driver.Pod, logPath string) (int, error) { if err := os.MkdirAll(filepath.Dir(logPath), 0770); err != nil { return -1, err } logf, err := os.Create(logPath) if err != nil { return -1, err } defer logf.Close() // TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer. for level, wl := range t.Workspace { log.Debugf("unarchiving archives at level %d", level) for _, archives := range wl { for _, archive := range archives { log.Debugf("unarchiving workspace at level %d, taskID: %s, step: %d", level, archive.TaskID, archive.Step) resp, err := e.runserviceClient.GetArchive(ctx, archive.TaskID, archive.Step) if err != nil { // TODO(sgotti) retry before giving up fmt.Fprintf(logf, "error reading workspace archive: %v\n", err) return -1, err } archivef := resp.Body if err := e.unarchive(ctx, t, archivef, pod, logf, s.DestDir, false, false); err != nil { archivef.Close() return -1, err } archivef.Close() } } } return 0, nil } func (e *Executor) executorIDPath() string { return filepath.Join(e.c.DataDir, "id") } func (e *Executor) tasksDir() string { return filepath.Join(e.c.DataDir, "tasks") } func (e *Executor) taskPath(taskID string) string { return filepath.Join(e.tasksDir(), taskID) } func (e *Executor) taskLogsPath(taskID string) string { return filepath.Join(e.tasksDir(), taskID, "logs") } func (e *Executor) setupLogPath(taskID string) string { return filepath.Join(e.taskLogsPath(taskID), "setup.log") } func (e *Executor) stepLogPath(taskID string, stepID int) string { return filepath.Join(e.taskLogsPath(taskID), "steps", fmt.Sprintf("%d.log", stepID)) } func (e *Executor) archivePath(taskID string, stepID int) string { return filepath.Join(e.taskPath(taskID), "archives", fmt.Sprintf("%d.tar", stepID)) } func (e *Executor) sendExecutorStatus(ctx context.Context) error { executor := &types.Executor{ ID: e.id, ListenURL: e.listenURL, } log.Debugf("send executor status: %s", util.Dump(executor)) _, err := e.runserviceClient.SendExecutorStatus(ctx, executor) return err } func (e *Executor) sendExecutorTaskStatus(ctx context.Context, et *types.ExecutorTask) error { log.Debugf("send executor task: %s. status: %s", et.ID, et.Status.Phase) _, err := e.runserviceClient.SendExecutorTaskStatus(ctx, e.id, et) return err } func (e *Executor) stopTask(ctx context.Context, et *types.ExecutorTask) { if rt, ok := e.runningTasks.get(et.ID); ok { rt.Lock() defer rt.Unlock() if rt.et.Status.Phase.IsFinished() { return } if rt.pod != nil { if err := rt.pod.Stop(ctx); err != nil { log.Errorf("err: %+v", err) return } if rt.et.Status.Phase == types.ExecutorTaskPhaseNotStarted { rt.et.Status.Phase = types.ExecutorTaskPhaseCancelled } else { rt.et.Status.Phase = types.ExecutorTaskPhaseStopped } if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) return } } } } func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) { // * save in local state that we have a running task // * start the pod // * then update the executortask status to in-progress // if something fails pod will be cleaned up by the pod cleaner goroutine // In this way we are sure that the pod cleaner will only remove pod that don't // have an in progress running task if et.Status.Phase != types.ExecutorTaskPhaseNotStarted { log.Debugf("task phase is not \"not started\"") return } rt := &runningTask{ et: et, } rt.Lock() if !e.runningTasks.addIfNotExists(et.ID, rt) { log.Debugf("task %s already running", et.ID) return } defer e.runningTasks.delete(et.ID) et.Status.Phase = types.ExecutorTaskPhaseRunning et.Status.StartTime = util.TimePtr(time.Now()) et.Status.SetupStep.Phase = types.ExecutorTaskPhaseRunning et.Status.SetupStep.StartTime = util.TimePtr(time.Now()) if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) } if err := e.setupTask(ctx, rt); err != nil { rt.et.Status.Phase = types.ExecutorTaskPhaseFailed et.Status.SetupStep.EndTime = util.TimePtr(time.Now()) et.Status.SetupStep.Phase = types.ExecutorTaskPhaseFailed et.Status.SetupStep.EndTime = util.TimePtr(time.Now()) if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) } rt.Unlock() return } et.Status.SetupStep.Phase = types.ExecutorTaskPhaseSuccess et.Status.SetupStep.EndTime = util.TimePtr(time.Now()) if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) } rt.Unlock() _, err := e.executeTaskInternal(ctx, et, rt.pod) rt.Lock() if err != nil { log.Errorf("err: %+v", err) rt.et.Status.Phase = types.ExecutorTaskPhaseFailed } else { rt.et.Status.Phase = types.ExecutorTaskPhaseSuccess } rt.et.Status.EndTime = util.TimePtr(time.Now()) if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) } rt.Unlock() } func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { et := rt.et if err := os.RemoveAll(e.taskPath(et.ID)); err != nil { return err } if err := os.MkdirAll(e.taskPath(et.ID), 0770); err != nil { return err } cmd := []string{toolboxContainerPath, "sleeper"} if et.Containers[0].Entrypoint != "" { cmd = strings.Split(et.Containers[0].Entrypoint, " ") log.Infof("cmd: %v", cmd) } log.Debugf("starting pod") registryAuth, err := registryAuthToken(et.Containers[0].Auth) if err != nil { return err } podConfig := &driver.PodConfig{ Labels: createTaskLabels(et.ID), InitVolumeDir: toolboxContainerDir, Containers: []*driver.ContainerConfig{ { Image: et.Containers[0].Image, Cmd: cmd, Env: et.Containers[0].Environment, WorkingDir: et.WorkingDir, User: et.Containers[0].User, Privileged: et.Containers[0].Privileged, RegistryAuth: registryAuth, }, }, } setupLogPath := e.setupLogPath(et.ID) if err := os.MkdirAll(filepath.Dir(setupLogPath), 0770); err != nil { return err } outf, err := os.Create(setupLogPath) if err != nil { return err } defer outf.Close() outf.WriteString("Starting pod.\n") pod, err := e.driver.NewPod(ctx, podConfig, outf) if err != nil { outf.WriteString(fmt.Sprintf("Pod failed to start. Error: %s\n", err)) return err } outf.WriteString("Pod started.\n") rt.pod = pod return nil } func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTask, pod driver.Pod) (int, error) { log.Debugf("task: %s", et.TaskName) for i, step := range et.Steps { //log.Debugf("step: %v", util.Dump(step)) rt, ok := e.runningTasks.get(et.ID) if !ok { panic(errors.Errorf("not running task for task id %s, this should never happen", et.ID)) } rt.Lock() rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseRunning rt.et.Status.Steps[i].StartTime = util.TimePtr(time.Now()) if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) } rt.Unlock() var err error var exitCode int var stepName string switch s := step.(type) { case *types.RunStep: log.Debugf("run step: %s", util.Dump(s)) stepName = s.Name exitCode, err = e.doRunStep(ctx, s, et, pod, e.stepLogPath(et.ID, i)) case *types.SaveToWorkspaceStep: log.Debugf("save to workspace step: %s", util.Dump(s)) stepName = s.Name archivePath := e.archivePath(et.ID, i) exitCode, err = e.doSaveToWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i), archivePath) case *types.RestoreWorkspaceStep: log.Debugf("restore workspace step: %s", util.Dump(s)) stepName = s.Name exitCode, err = e.doRestoreWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i)) default: return i, errors.Errorf("unknown step type: %s", util.Dump(s)) } var serr error rt.Lock() rt.et.Status.Steps[i].EndTime = util.TimePtr(time.Now()) rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseSuccess if err != nil { if rt.et.Stop { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped } else { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed } serr = errors.Wrapf(err, "failed to execute step") } else if exitCode != 0 { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed rt.et.Status.Steps[i].ExitCode = exitCode serr = errors.Errorf("step %q failed with exitcode %d", stepName, exitCode) } if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) } rt.Unlock() if serr != nil { return i, serr } } return 0, nil } func createAllLabels() map[string]string { return map[string]string{} } func createTaskLabels(taskID string) map[string]string { return map[string]string{ taskIDLabel: taskID, } } func (e *Executor) podsCleanerLoop(ctx context.Context) { for { log.Debugf("podsCleaner") if err := e.podsCleaner(ctx); err != nil { log.Errorf("err: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(1 * time.Second) } } func (e *Executor) podsCleaner(ctx context.Context) error { pods, err := e.getAllPods(ctx, true) if err != nil { return err } for _, pod := range pods { taskID, ok := pod.Labels()[taskIDLabel] if !ok { continue } if _, ok := e.runningTasks.get(taskID); !ok { log.Infof("removing pod %s for not running task: %s", pod.ID(), taskID) pod.Remove(ctx) } } return nil } func (e *Executor) executorStatusSenderLoop(ctx context.Context) { for { log.Debugf("executorStatusSender") if err := e.sendExecutorStatus(ctx); err != nil { log.Errorf("err: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(2 * time.Second) } } func (e *Executor) tasksCleanerLoop(ctx context.Context) { for { log.Debugf("tasksCleaner") if err := e.tasksCleaner(ctx); err != nil { log.Errorf("err: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(2 * time.Second) } } func (e *Executor) tasksCleaner(ctx context.Context) error { ets, _, err := e.runserviceClient.GetExecutorTasks(ctx, e.id) if err != nil { log.Warnf("err: %v", err) return err } log.Debugf("ets: %v", util.Dump(ets)) for _, et := range ets { go e.cleanTask(ctx, et) } return nil } func (e *Executor) cleanTask(ctx context.Context, et *types.ExecutorTask) { log.Debugf("et: %v", util.Dump(et)) if et.Status.ExecutorID != e.id { return } if et.Stop { e.stopTask(ctx, et) } if et.Status.Phase == types.ExecutorTaskPhaseNotStarted { e.executeTask(ctx, et) } if et.Status.Phase == types.ExecutorTaskPhaseRunning { _, ok := e.runningTasks.get(et.ID) if !ok { log.Infof("marking executor task %s as failed since there's no running task", et.ID) et.Status.Phase = types.ExecutorTaskPhaseFailed // mark in progress step as failed too for _, s := range et.Status.Steps { if s.Phase == types.ExecutorTaskPhaseRunning { s.Phase = types.ExecutorTaskPhaseFailed s.EndTime = util.TimePtr(time.Now()) } } e.sendExecutorTaskStatus(ctx, et) } } } func (e *Executor) tasksDataCleanerLoop(ctx context.Context) { for { log.Debugf("tasksDataCleaner") if err := e.tasksDataCleaner(ctx); err != nil { log.Errorf("err: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(2 * time.Second) } } func (e *Executor) tasksDataCleaner(ctx context.Context) error { entries, err := ioutil.ReadDir(e.tasksDir()) if err != nil { return err } for _, entry := range entries { if !entry.IsDir() { continue } etID := filepath.Base(entry.Name()) _, resp, err := e.runserviceClient.GetExecutorTask(ctx, e.id, etID) if err != nil { if resp == nil { return err } if resp.StatusCode != http.StatusNotFound { return err } } if resp.StatusCode == http.StatusNotFound { taskDir := filepath.Join(e.tasksDir(), entry.Name()) log.Infof("removing task dir %q", taskDir) // remove task dir if err := os.RemoveAll(taskDir); err != nil { return err } } } return nil } type runningTasks struct { tasks map[string]*runningTask m sync.Mutex } type runningTask struct { sync.Mutex et *types.ExecutorTask pod driver.Pod } func (r *runningTasks) get(rtID string) (*runningTask, bool) { r.m.Lock() defer r.m.Unlock() rt, ok := r.tasks[rtID] return rt, ok } func (r *runningTasks) addIfNotExists(rtID string, rt *runningTask) bool { r.m.Lock() defer r.m.Unlock() if _, ok := r.tasks[rtID]; ok { return false } r.tasks[rtID] = rt return true } func (r *runningTasks) add(rtID string, rt *runningTask) { r.m.Lock() defer r.m.Unlock() r.tasks[rtID] = rt } func (r *runningTasks) delete(rtID string) { r.m.Lock() defer r.m.Unlock() delete(r.tasks, rtID) } func (e *Executor) handleTasks(ctx context.Context, c <-chan *types.ExecutorTask) { for et := range c { go e.executeTask(ctx, et) } } func (e *Executor) getExecutorID() (string, error) { id, err := ioutil.ReadFile(e.executorIDPath()) if err != nil && !os.IsNotExist(err) { return "", err } return string(id), nil } func (e *Executor) saveExecutorID(id string) error { if err := common.WriteFileAtomic(e.executorIDPath(), []byte(id), 0660); err != nil { return errors.Wrapf(err, "failed to write executor id file") } return nil } type Executor struct { c *config.RunServiceExecutor runserviceClient *rsapi.Client id string runningTasks *runningTasks driver driver.Driver listenURL string } func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { if c.Debug { level.SetLevel(zapcore.DebugLevel) } var err error c.ToolboxPath, err = filepath.Abs(c.ToolboxPath) if err != nil { return nil, errors.Wrapf(err, "cannot find \"agola-toolbox\" absolute path") } if c.ToolboxPath == "" { path, err := exec.LookPath("agola-toolbox") if err != nil { return nil, errors.Errorf("cannot find \"agola-toolbox\" binaries in PATH, agola-toolbox path must be explicitly provided") } c.ToolboxPath = path } dockerDriver, err := driver.NewDockerDriver(logger, "/tmp/agola/bin") if err != nil { return nil, errors.Wrapf(err, "failed to create docker client") } e := &Executor{ c: c, runserviceClient: rsapi.NewClient(c.RunServiceURL), driver: dockerDriver, runningTasks: &runningTasks{ tasks: make(map[string]*runningTask), }, } if err := os.MkdirAll(e.tasksDir(), 0770); err != nil { return nil, err } id, err := e.getExecutorID() if err != nil { return nil, err } if id == "" { id = uuid.NewV4().String() if err := e.saveExecutorID(id); err != nil { return nil, err } } e.id = id addr, err := sockaddr.GetPrivateIP() if err != nil { return nil, errors.Wrapf(err, "cannot discover executor listen address") } if addr == "" { return nil, errors.Errorf("cannot discover executor listen address") } u := url.URL{Scheme: "http"} if c.Web.TLS { u.Scheme = "https" } _, port, err := net.SplitHostPort(c.Web.ListenAddress) if err != nil { return nil, errors.Wrapf(err, "cannot get web listen port") } u.Host = net.JoinHostPort(addr, port) e.listenURL = u.String() return e, nil } func (e *Executor) Run(ctx context.Context) error { if err := e.driver.(*driver.DockerDriver).CopyToolbox(context.TODO(), e.c.ToolboxPath); err != nil { return err } ch := make(chan *types.ExecutorTask) schedulerHandler := NewTaskSubmissionHandler(ch) logsHandler := NewLogsHandler(logger, e) archivesHandler := NewArchivesHandler(e) router := mux.NewRouter() apirouter := router.PathPrefix("/api/v1alpha").Subrouter() apirouter.Handle("/executor", schedulerHandler).Methods("POST") apirouter.Handle("/executor/logs", logsHandler).Methods("GET") apirouter.Handle("/executor/archives", archivesHandler).Methods("GET") go e.executorStatusSenderLoop(ctx) go e.podsCleanerLoop(ctx) go e.tasksCleanerLoop(ctx) go e.tasksDataCleanerLoop(ctx) go e.handleTasks(ctx, ch) httpServer := http.Server{ Addr: e.c.Web.ListenAddress, Handler: apirouter, } lerrCh := make(chan error) go func() { lerrCh <- httpServer.ListenAndServe() }() select { case <-ctx.Done(): log.Infof("runservice executor exiting") httpServer.Close() return nil case err := <-lerrCh: log.Errorf("http server listen error: %v", err) return err } }