diff --git a/internal/services/runservice/executor/driver/docker.go b/internal/services/runservice/executor/driver/docker.go index 9797646..8c3def3 100644 --- a/internal/services/runservice/executor/driver/docker.go +++ b/internal/services/runservice/executor/driver/docker.go @@ -43,9 +43,10 @@ type DockerDriver struct { client *client.Client initVolumeHostDir string toolboxPath string + executorID string } -func NewDockerDriver(logger *zap.Logger, initVolumeHostDir, toolboxPath string) (*DockerDriver, error) { +func NewDockerDriver(logger *zap.Logger, executorID, initVolumeHostDir, toolboxPath string) (*DockerDriver, error) { cli, err := client.NewEnvClient() if err != nil { return nil, err @@ -55,6 +56,7 @@ func NewDockerDriver(logger *zap.Logger, initVolumeHostDir, toolboxPath string) client: cli, initVolumeHostDir: initVolumeHostDir, toolboxPath: toolboxPath, + executorID: executorID, }, nil } @@ -149,12 +151,9 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io. io.Copy(out, reader) labels := map[string]string{} - // prepend the podLabelPrefix to the labels' keys - for k, v := range podConfig.Labels { - labels[labelPrefix+k] = v - } labels[agolaLabelKey] = agolaLabelValue labels[podIDKey] = podConfig.ID + labels[taskIDKey] = podConfig.TaskID containerLabels := map[string]string{} for k, v := range labels { @@ -204,15 +203,21 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io. id: podConfig.ID, client: d.client, containers: containers, + executorID: d.executorID, }, nil } -func (d *DockerDriver) GetPodsByLabels(ctx context.Context, labels map[string]string, all bool) ([]Pod, error) { +func (d *DockerDriver) ExecutorGroup(ctx context.Context) (string, error) { + // use the same group as the executor id + return d.executorID, nil +} + +func (d *DockerDriver) GetExecutors(ctx context.Context) ([]string, error) { + return []string{d.executorID}, nil +} + +func (d *DockerDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) { args := filters.NewArgs() - // search label adding the podLabelPrefix - for k, v := range labels { - args.Add("label", fmt.Sprintf("%s%s=%s", labelPrefix, k, v)) - } containers, err := d.client.ContainerList(ctx, types.ContainerListOptions{ @@ -235,6 +240,7 @@ func (d *DockerDriver) GetPodsByLabels(ctx context.Context, labels map[string]st id: podID, client: d.client, containers: []types.Container{container}, + executorID: d.executorID, } podsMap[podID] = pod @@ -268,9 +274,10 @@ func (d *DockerDriver) GetPodsByLabels(ctx context.Context, labels map[string]st // add labels from the container with index 0 if cIndex == 0 { podLabels := map[string]string{} + // keep only labels starting with our prefix for labelName, labelValue := range container.Labels { if strings.HasPrefix(labelName, labelPrefix) { - podLabels[strings.TrimPrefix(labelName, labelPrefix)] = labelValue + podLabels[labelName] = labelValue } } pod.labels = podLabels @@ -286,9 +293,10 @@ func (d *DockerDriver) GetPodsByLabels(ctx context.Context, labels map[string]st func podLabelsFromContainer(containerLabels map[string]string) map[string]string { labels := map[string]string{} + // keep only labels starting with our prefix for k, v := range containerLabels { if strings.HasPrefix(k, labelPrefix) { - labels[strings.TrimPrefix(k, labelPrefix)] = v + labels[k] = v } } return labels @@ -321,14 +329,19 @@ type DockerPod struct { client *client.Client labels map[string]string containers []types.Container + executorID string } func (dp *DockerPod) ID() string { return dp.id } -func (dp *DockerPod) Labels() map[string]string { - return dp.labels +func (dp *DockerPod) ExecutorID() string { + return dp.executorID +} + +func (dp *DockerPod) TaskID() string { + return dp.labels[taskIDKey] } func (dp *DockerPod) Stop(ctx context.Context) error { diff --git a/internal/services/runservice/executor/driver/docker_test.go b/internal/services/runservice/executor/driver/docker_test.go index 4cd55db..b9a5c0d 100644 --- a/internal/services/runservice/executor/driver/docker_test.go +++ b/internal/services/runservice/executor/driver/docker_test.go @@ -26,6 +26,7 @@ import ( "testing" "unicode" + uuid "github.com/satori/go.uuid" slog "github.com/sorintlab/agola/internal/log" "github.com/google/go-cmp/cmp" @@ -72,7 +73,7 @@ func parseEnvs(r io.Reader) (map[string]string, error) { return envs, nil } -func TestPod(t *testing.T) { +func TestDockerPod(t *testing.T) { if os.Getenv("SKIP_DOCKER_TESTS") == "1" { t.Skip("skipping since env var SKIP_DOCKER_TESTS is 1") } @@ -87,7 +88,7 @@ func TestPod(t *testing.T) { } defer os.RemoveAll(dir) - d, err := NewDockerDriver(logger, dir, toolboxPath) + d, err := NewDockerDriver(logger, "executorid01", dir, toolboxPath) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -96,6 +97,8 @@ func TestPod(t *testing.T) { t.Run("create a pod with one container", func(t *testing.T) { pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), Containers: []*ContainerConfig{ &ContainerConfig{ Cmd: []string{"cat"}, @@ -107,12 +110,13 @@ func TestPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) }) t.Run("execute a command inside a pod", func(t *testing.T) { pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), Containers: []*ContainerConfig{ &ContainerConfig{ Cmd: []string{"cat"}, @@ -124,6 +128,7 @@ func TestPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } + defer pod.Remove(ctx) ce, err := pod.Exec(ctx, &ExecConfig{ Cmd: []string{"ls"}, @@ -138,10 +143,8 @@ func TestPod(t *testing.T) { t.Fatalf("unexpected err: %v", err) } if code != 0 { - t.Fatalf("unexpected exito code: %d", code) + t.Fatalf("unexpected exit code: %d", code) } - - defer pod.Remove(ctx) }) t.Run("test pod environment", func(t *testing.T) { @@ -151,6 +154,8 @@ func TestPod(t *testing.T) { } pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), Containers: []*ContainerConfig{ &ContainerConfig{ Cmd: []string{"cat"}, @@ -163,6 +168,7 @@ func TestPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } + defer pod.Remove(ctx) var buf bytes.Buffer ce, err := pod.Exec(ctx, &ExecConfig{ @@ -197,12 +203,12 @@ func TestPod(t *testing.T) { } } } - - defer pod.Remove(ctx) }) - t.Run("test get pods by label", func(t *testing.T) { + t.Run("test get pods", func(t *testing.T) { pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), Containers: []*ContainerConfig{ &ContainerConfig{ Cmd: []string{"cat"}, @@ -214,8 +220,9 @@ func TestPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } + defer pod.Remove(ctx) - pods, err := d.GetPodsByLabels(ctx, map[string]string{}, true) + pods, err := d.GetPods(ctx, true) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -239,8 +246,6 @@ func TestPod(t *testing.T) { if !ok { t.Fatalf("pod with id %q not found", pod.ID()) } - - defer pod.Remove(ctx) }) } diff --git a/internal/services/runservice/executor/driver/driver.go b/internal/services/runservice/executor/driver/driver.go index a74326c..bca32ad 100644 --- a/internal/services/runservice/executor/driver/driver.go +++ b/internal/services/runservice/executor/driver/driver.go @@ -44,15 +44,19 @@ const ( type Driver interface { Setup(ctx context.Context) error NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error) - GetPodsByLabels(ctx context.Context, labels map[string]string, all bool) ([]Pod, error) + GetPods(ctx context.Context, all bool) ([]Pod, error) GetPodByID(ctx context.Context, containerID string) (Pod, error) + ExecutorGroup(ctx context.Context) (string, error) + GetExecutors(ctx context.Context) ([]string, error) } type Pod interface { // ID returns the pod id ID() string - // Labels returns the pod labels - Labels() map[string]string + // ExecutorID return the pod owner executor id + ExecutorID() string + // TaskID return the pod task id + TaskID() string // Stop stops the pod Stop(ctx context.Context) error // Stop stops the pod @@ -68,8 +72,8 @@ type ContainerExec interface { type PodConfig struct { ID string + TaskID string Containers []*ContainerConfig - Labels map[string]string // The container dir where the init volume will be mounted InitVolumeDir string DockerConfig *registry.DockerConfig diff --git a/internal/services/runservice/executor/executor.go b/internal/services/runservice/executor/executor.go index de4f3ac..c4cd718 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/runservice/executor/executor.go @@ -56,8 +56,6 @@ var log = logger.Sugar() const ( defaultShell = "/bin/sh -e" - taskIDLabel = "taskid" - toolboxContainerDir = "/mnt/agola" ) @@ -66,7 +64,7 @@ var ( ) func (e *Executor) getAllPods(ctx context.Context, all bool) ([]driver.Pod, error) { - return e.driver.GetPodsByLabels(ctx, createAllLabels(), all) + return e.driver.GetPods(ctx, all) } func (e *Executor) createFile(ctx context.Context, pod driver.Pod, command, user string, outf io.Writer) (string, error) { @@ -160,7 +158,7 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe workingDir, err = e.expandDir(ctx, t, pod, outf, workingDir) if err != nil { - outf.WriteString(fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", workingDir, err)) + outf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", workingDir, err)) return -1, err } @@ -210,7 +208,7 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) if err != nil { - logf.WriteString(fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) return -1, err } @@ -332,7 +330,7 @@ func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driv workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) if err != nil { - io.WriteString(logf, fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) return "", err } @@ -378,7 +376,7 @@ func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) if err != nil { - io.WriteString(logf, fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) return err } @@ -501,7 +499,7 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) if err != nil { - io.WriteString(logf, fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) return -1, err } @@ -663,16 +661,45 @@ func (e *Executor) sendExecutorStatus(ctx context.Context) error { arch := runtime.GOARCH labels["arch"] = arch + executorGroup, err := e.driver.ExecutorGroup(ctx) + if err != nil { + return err + } + // report all the executors that are active OR that have some owned pods not yet removed + activeExecutors, err := e.driver.GetExecutors(ctx) + if err != nil { + return err + } + pods, err := e.driver.GetPods(ctx, true) + if err != nil { + return err + } + + executorsMap := map[string]struct{}{} + for _, executorID := range activeExecutors { + executorsMap[executorID] = struct{}{} + } + for _, pod := range pods { + executorsMap[pod.ExecutorID()] = struct{}{} + } + siblingsExecutors := []string{} + for executorID := range executorsMap { + siblingsExecutors = append(siblingsExecutors, executorID) + } + executor := &types.Executor{ - ID: e.id, - ListenURL: e.listenURL, - Labels: labels, - ActiveTasksLimit: e.c.ActiveTasksLimit, - ActiveTasks: activeTasks, + ID: e.id, + ListenURL: e.listenURL, + Labels: labels, + ActiveTasksLimit: e.c.ActiveTasksLimit, + ActiveTasks: activeTasks, + Dynamic: e.dynamic, + ExecutorGroup: executorGroup, + SiblingsExecutors: siblingsExecutors, } log.Debugf("send executor status: %s", util.Dump(executor)) - _, err := e.runserviceClient.SendExecutorStatus(ctx, executor) + _, err = e.runserviceClient.SendExecutorStatus(ctx, executor) return err } @@ -806,7 +833,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { // generate a random pod id (don't use task id for future ability to restart // tasks failed to start and don't clash with existing pods) ID: uuid.NewV4().String(), - Labels: createTaskLabels(et.ID), + TaskID: et.ID, InitVolumeDir: toolboxContainerDir, DockerConfig: dockerConfig, Containers: []*driver.ContainerConfig{ @@ -938,16 +965,6 @@ func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTa return 0, nil } -func createAllLabels() map[string]string { - return map[string]string{} -} - -func createTaskLabels(taskID string) map[string]string { - return map[string]string{ - taskIDLabel: taskID, - } -} - func (e *Executor) podsCleanerLoop(ctx context.Context) { for { log.Debugf("podsCleaner") @@ -971,13 +988,33 @@ func (e *Executor) podsCleaner(ctx context.Context) error { if err != nil { return err } + executors, err := e.driver.GetExecutors(ctx) + if err != nil { + return err + } + // always add ourself to executors + executors = append(executors, e.id) + for _, pod := range pods { - taskID, ok := pod.Labels()[taskIDLabel] - if !ok { - continue + taskID := pod.TaskID() + // clean our owned pods + if pod.ExecutorID() == e.id { + if _, ok := e.runningTasks.get(taskID); !ok { + log.Infof("removing pod %s for not running task: %s", pod.ID(), taskID) + pod.Remove(ctx) + } } - if _, ok := e.runningTasks.get(taskID); !ok { - log.Infof("removing pod %s for not running task: %s", pod.ID(), taskID) + + // if no executor owns the pod we'll delete it + owned := false + for _, executorID := range executors { + if pod.ExecutorID() == executorID { + owned = true + break + } + } + if !owned { + log.Infof("removing pod %s since it's not owned by any active executor", pod.ID()) pod.Remove(ctx) } } @@ -1196,6 +1233,7 @@ type Executor struct { runningTasks *runningTasks driver driver.Driver listenURL string + dynamic bool } func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { @@ -1216,15 +1254,9 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { c.ToolboxPath = path } - dockerDriver, err := driver.NewDockerDriver(logger, "/tmp/agola/bin", c.ToolboxPath) - if err != nil { - return nil, errors.Wrapf(err, "failed to create docker client") - } - e := &Executor{ c: c, runserviceClient: rsapi.NewClient(c.RunServiceURL), - driver: dockerDriver, runningTasks: &runningTasks{ tasks: make(map[string]*runningTask), }, @@ -1265,6 +1297,12 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { u.Host = net.JoinHostPort(addr, port) e.listenURL = u.String() + d, err := driver.NewDockerDriver(logger, e.id, "/tmp/agola/bin", e.c.ToolboxPath) + if err != nil { + return nil, errors.Wrapf(err, "failed to create docker driver") + } + e.driver = d + return e, nil } diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index e9d27dc..58935f9 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -669,6 +669,17 @@ type Executor struct { ActiveTasksLimit int `json:"active_tasks_limit,omitempty"` ActiveTasks int `json:"active_tasks,omitempty"` + // Dynamic represents an executor that can be automatically removed since it's + // part of a group of executors managing the same resources (i.e. a k8s + // namespace managed by multiple executors that will automatically clean pods + // owned of an old executor) + Dynamic bool `json:"dynamic,omitempty"` + + // ExecutorGroup is the executor group which this executor belongs + ExecutorGroup string `json:"executor_group,omitempty"` + // SiblingExecutors are all the executors in the ExecutorGroup + SiblingsExecutors []string `json:"siblings_executors,omitempty"` + // internal values not saved Revision int64 `json:"-"` }