From caa9d74b72063174f8943b00bd266c0a8f4d761c Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 10 May 2019 17:31:23 +0200 Subject: [PATCH] runservice: implement additional service containers This was already defined in the config but not implemented in the executor and drivers. All the containers defined in the runtime after the first one will be "service" containers. They will share the same network namespace with the other containers in the "pod" so they can communicate between themself on loopback --- internal/services/executor/driver/docker.go | 182 ++++++++++++------ .../services/executor/driver/docker_test.go | 108 ++++++++++- internal/services/executor/driver/k8s.go | 62 +++--- internal/services/executor/driver/k8s_test.go | 63 ++++++ internal/services/executor/executor.go | 33 ++-- 5 files changed, 346 insertions(+), 102 deletions(-) diff --git a/internal/services/executor/driver/docker.go b/internal/services/executor/driver/docker.go index 375ab03..19d1c5a 100644 --- a/internal/services/executor/driver/docker.go +++ b/internal/services/executor/driver/docker.go @@ -139,67 +139,30 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io. return nil, errors.Errorf("empty container config") } - containerConfig := podConfig.Containers[0] + var mainContainerID string + for cindex := range podConfig.Containers { + resp, err := d.createContainer(ctx, cindex, podConfig, mainContainerID, out) + if err != nil { + return nil, err + } - regName, err := registry.GetRegistry(containerConfig.Image) - if err != nil { - return nil, err - } - var registryAuth registry.DockerConfigAuth - if podConfig.DockerConfig != nil { - if regauth, ok := podConfig.DockerConfig.Auths[regName]; ok { - registryAuth = regauth + containerID := resp.ID + if cindex == 0 { + // save the maincontainerid + mainContainerID = containerID + } + + if err := d.client.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil { + return nil, err } } - buf, err := json.Marshal(registryAuth) - if err != nil { - return nil, err - } - registryAuthEnc := base64.URLEncoding.EncodeToString(buf) - - // by default always try to pull the image so we are sure only authorized users can fetch them - // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages - reader, err := d.client.ImagePull(ctx, containerConfig.Image, types.ImagePullOptions{RegistryAuth: registryAuthEnc}) - if err != nil { - return nil, err - } - io.Copy(out, reader) - - labels := map[string]string{} - labels[agolaLabelKey] = agolaLabelValue - labels[podIDKey] = podConfig.ID - labels[taskIDKey] = podConfig.TaskID - - containerLabels := map[string]string{} - for k, v := range labels { - containerLabels[k] = v - } - containerLabels[containerIndexKey] = "0" - - resp, err := d.client.ContainerCreate(ctx, &container.Config{ - Entrypoint: containerConfig.Cmd, - Env: makeEnvSlice(containerConfig.Env), - WorkingDir: containerConfig.WorkingDir, - Image: containerConfig.Image, - Tty: true, - Labels: containerLabels, - }, &container.HostConfig{ - Binds: []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)}, - ReadonlyPaths: []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)}, - Privileged: containerConfig.Privileged, - }, nil, "") - if err != nil { - return nil, err - } - - containerID := resp.ID - - if err := d.client.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil { - return nil, err - } + searchLabels := map[string]string{} + searchLabels[agolaLabelKey] = agolaLabelValue + searchLabels[podIDKey] = podConfig.ID + searchLabels[taskIDKey] = podConfig.TaskID args := filters.NewArgs() - for k, v := range labels { + for k, v := range searchLabels { args.Add("label", fmt.Sprintf("%s=%s", k, v)) } @@ -211,15 +174,114 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io. return nil, err } if len(containers) == 0 { - return nil, errors.Errorf("no container with id %s", containerID) + return nil, errors.Errorf("no container with labels %s", searchLabels) } - return &DockerPod{ + pod := &DockerPod{ id: podConfig.ID, client: d.client, - containers: containers, executorID: d.executorID, - }, nil + containers: make([]types.Container, len(containers)), + } + + // Put the containers in the right order based on their containerIndexKey label value + count := 0 + seenIndexes := map[int]struct{}{} + for _, container := range containers { + cIndexStr, ok := container.Labels[containerIndexKey] + if !ok { + // ignore container + continue + } + cIndex, err := strconv.Atoi(cIndexStr) + if err != nil { + // ignore container + continue + } + if _, ok := seenIndexes[cIndex]; ok { + return nil, errors.Errorf("duplicate container with index %d", cIndex) + } + pod.containers[cIndex] = container + + seenIndexes[cIndex] = struct{}{} + count++ + } + if count != len(containers) { + return nil, errors.Errorf("expected %d containers but got %d", len(containers), count) + } + + return pod, nil +} + +func (d *DockerDriver) fetchImage(ctx context.Context, image string, registryConfig *registry.DockerConfig, out io.Writer) error { + regName, err := registry.GetRegistry(image) + if err != nil { + return err + } + var registryAuth registry.DockerConfigAuth + if registryConfig != nil { + if regauth, ok := registryConfig.Auths[regName]; ok { + registryAuth = regauth + } + } + buf, err := json.Marshal(registryAuth) + if err != nil { + return err + } + registryAuthEnc := base64.URLEncoding.EncodeToString(buf) + + // by default always try to pull the image so we are sure only authorized users can fetch them + // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages + reader, err := d.client.ImagePull(ctx, image, types.ImagePullOptions{RegistryAuth: registryAuthEnc}) + if err != nil { + return err + } + + _, err = io.Copy(out, reader) + return err +} + +func (d *DockerDriver) createContainer(ctx context.Context, index int, podConfig *PodConfig, maincontainerID string, out io.Writer) (*container.ContainerCreateCreatedBody, error) { + containerConfig := podConfig.Containers[index] + + if err := d.fetchImage(ctx, containerConfig.Image, podConfig.DockerConfig, out); err != nil { + return nil, err + } + + labels := map[string]string{} + labels[agolaLabelKey] = agolaLabelValue + labels[podIDKey] = podConfig.ID + labels[taskIDKey] = podConfig.TaskID + + containerLabels := map[string]string{} + for k, v := range labels { + containerLabels[k] = v + } + containerLabels[containerIndexKey] = strconv.Itoa(index) + + cliContainerConfig := &container.Config{ + Entrypoint: containerConfig.Cmd, + Env: makeEnvSlice(containerConfig.Env), + WorkingDir: containerConfig.WorkingDir, + Image: containerConfig.Image, + Tty: true, + Labels: containerLabels, + } + + cliHostConfig := &container.HostConfig{ + Privileged: containerConfig.Privileged, + } + if index == 0 { + // main container requires the initvolume containing the toolbox + cliHostConfig.Binds = []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)} + cliHostConfig.ReadonlyPaths = []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)} + } else { + // attach other containers to maincontainer network + cliHostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:%s", maincontainerID)) + } + + resp, err := d.client.ContainerCreate(ctx, cliContainerConfig, cliHostConfig, nil, "") + return &resp, err } func (d *DockerDriver) ExecutorGroup(ctx context.Context) (string, error) { diff --git a/internal/services/executor/driver/docker_test.go b/internal/services/executor/driver/docker_test.go index 20159b0..dadfd15 100644 --- a/internal/services/executor/driver/docker_test.go +++ b/internal/services/executor/driver/docker_test.go @@ -24,6 +24,7 @@ import ( "os" "strings" "testing" + "time" "unicode" uuid "github.com/satori/go.uuid" @@ -203,7 +204,67 @@ func TestDockerPod(t *testing.T) { } }) - t.Run("test get pods", func(t *testing.T) { + t.Run("create a pod with two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + }) + + t.Run("test communication between two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + // wait for nginx up + time.Sleep(1 * time.Second) + + ce, err := pod.Exec(ctx, &ExecConfig{ + Cmd: []string{"nc", "-z", "localhost", "80"}, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + code, err := ce.Wait(ctx) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if code != 0 { + t.Fatalf("unexpected exit code: %d", code) + } + }) + + t.Run("test get pods single container", func(t *testing.T) { pod, err := d.NewPod(ctx, &PodConfig{ ID: uuid.NewV4().String(), TaskID: uuid.NewV4().String(), @@ -246,4 +307,49 @@ func TestDockerPod(t *testing.T) { } }) + t.Run("test get pods two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + pods, err := d.GetPods(ctx, true) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ok := false + for _, p := range pods { + if p.ID() == pod.ID() { + ok = true + ip := pod.(*DockerPod) + dp := p.(*DockerPod) + for i, c := range dp.containers { + if c.ID != ip.containers[i].ID { + t.Fatalf("different pod id, want: %s, got: %s", ip.id, dp.id) + } + if diff := cmp.Diff(ip.containers[i], c); diff != "" { + t.Error(diff) + } + } + } + } + if !ok { + t.Fatalf("pod with id %q not found", pod.ID()) + } + }) } diff --git a/internal/services/executor/driver/k8s.go b/internal/services/executor/driver/k8s.go index 6a55b45..f1c2326 100644 --- a/internal/services/executor/driver/k8s.go +++ b/internal/services/executor/driver/k8s.go @@ -292,8 +292,6 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri return nil, errors.Errorf("empty container config") } - containerConfig := podConfig.Containers[0] - secretClient := d.client.CoreV1().Secrets(d.namespace) podClient := d.client.CoreV1().Pods(d.namespace) @@ -355,30 +353,7 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri }, }, }, - Containers: []corev1.Container{ - { - Name: mainContainerName, - Image: containerConfig.Image, - Command: containerConfig.Cmd[0:1], - Args: containerConfig.Cmd[1:], - Env: genEnvVars(containerConfig.Env), - Stdin: true, - WorkingDir: containerConfig.WorkingDir, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "agolavolume", - MountPath: podConfig.InitVolumeDir, - ReadOnly: true, - }, - }, - // by default always try to pull the image so we are sure only authorized users can fetch them - // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages - ImagePullPolicy: corev1.PullAlways, - SecurityContext: &corev1.SecurityContext{ - Privileged: &containerConfig.Privileged, - }, - }, - }, + Containers: []corev1.Container{}, Volumes: []corev1.Volume{ { Name: "agolavolume", @@ -390,6 +365,41 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri }, } + // define containers + for cIndex, containerConfig := range podConfig.Containers { + var containerName string + if cIndex == 0 { + containerName = mainContainerName + } else { + containerName = fmt.Sprintf("service%d", cIndex) + } + c := corev1.Container{ + Name: containerName, + Image: containerConfig.Image, + Command: containerConfig.Cmd, + Env: genEnvVars(containerConfig.Env), + Stdin: true, + WorkingDir: containerConfig.WorkingDir, + // by default always try to pull the image so we are sure only authorized users can fetch them + // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages + ImagePullPolicy: corev1.PullAlways, + SecurityContext: &corev1.SecurityContext{ + Privileged: &containerConfig.Privileged, + }, + } + if cIndex == 0 { + // main container requires the initvolume containing the toolbox + c.VolumeMounts = []corev1.VolumeMount{ + { + Name: "agolavolume", + MountPath: podConfig.InitVolumeDir, + ReadOnly: true, + }, + } + } + pod.Spec.Containers = append(pod.Spec.Containers, c) + } + if podConfig.Arch != "" { pod.Spec.NodeSelector = map[string]string{ corev1.LabelArchStable: string(podConfig.Arch), diff --git a/internal/services/executor/driver/k8s_test.go b/internal/services/executor/driver/k8s_test.go index 260da68..ae568cc 100644 --- a/internal/services/executor/driver/k8s_test.go +++ b/internal/services/executor/driver/k8s_test.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "os" "testing" + "time" uuid "github.com/satori/go.uuid" ) @@ -156,6 +157,68 @@ func TestK8sPod(t *testing.T) { } }) + t.Run("create a pod with two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + }) + + t.Run("test communication between two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + // wait for nginx up + time.Sleep(1 * time.Second) + + var buf bytes.Buffer + ce, err := pod.Exec(ctx, &ExecConfig{ + Cmd: []string{"nc", "-z", "localhost", "80"}, + Stdout: &buf, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + code, err := ce.Wait(ctx) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if code != 0 { + t.Fatalf("unexpected exit code: %d", code) + } + }) + t.Run("test get pods", func(t *testing.T) { pod, err := d.NewPod(ctx, &PodConfig{ ID: uuid.NewV4().String(), diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index bc331f2..e7ac219 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -822,12 +822,6 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { return err } - cmd := []string{toolboxContainerPath, "sleeper"} - if et.Containers[0].Entrypoint != "" { - cmd = strings.Split(et.Containers[0].Entrypoint, " ") - log.Infof("cmd: %v", cmd) - } - log.Debugf("starting pod") dockerConfig, err := registry.GenDockerConfig(et.DockerRegistriesAuth, []string{et.Containers[0].Image}) @@ -843,15 +837,24 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { Arch: et.Arch, InitVolumeDir: toolboxContainerDir, DockerConfig: dockerConfig, - Containers: []*driver.ContainerConfig{ - { - Image: et.Containers[0].Image, - Cmd: cmd, - Env: et.Containers[0].Environment, - User: et.Containers[0].User, - Privileged: et.Containers[0].Privileged, - }, - }, + Containers: make([]*driver.ContainerConfig, len(et.Containers)), + } + for i, c := range et.Containers { + var cmd []string + if i == 0 { + cmd = []string{toolboxContainerPath, "sleeper"} + } + if c.Entrypoint != "" { + cmd = strings.Split(c.Entrypoint, " ") + } + + podConfig.Containers[i] = &driver.ContainerConfig{ + Image: c.Image, + Cmd: cmd, + Env: c.Environment, + User: c.User, + Privileged: c.Privileged, + } } setupLogPath := e.setupLogPath(et.ID)