diff --git a/internal/services/executor/driver/docker.go b/internal/services/executor/driver/docker.go index 375ab03..19d1c5a 100644 --- a/internal/services/executor/driver/docker.go +++ b/internal/services/executor/driver/docker.go @@ -139,67 +139,30 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io. return nil, errors.Errorf("empty container config") } - containerConfig := podConfig.Containers[0] + var mainContainerID string + for cindex := range podConfig.Containers { + resp, err := d.createContainer(ctx, cindex, podConfig, mainContainerID, out) + if err != nil { + return nil, err + } - regName, err := registry.GetRegistry(containerConfig.Image) - if err != nil { - return nil, err - } - var registryAuth registry.DockerConfigAuth - if podConfig.DockerConfig != nil { - if regauth, ok := podConfig.DockerConfig.Auths[regName]; ok { - registryAuth = regauth + containerID := resp.ID + if cindex == 0 { + // save the maincontainerid + mainContainerID = containerID + } + + if err := d.client.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil { + return nil, err } } - buf, err := json.Marshal(registryAuth) - if err != nil { - return nil, err - } - registryAuthEnc := base64.URLEncoding.EncodeToString(buf) - - // by default always try to pull the image so we are sure only authorized users can fetch them - // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages - reader, err := d.client.ImagePull(ctx, containerConfig.Image, types.ImagePullOptions{RegistryAuth: registryAuthEnc}) - if err != nil { - return nil, err - } - io.Copy(out, reader) - - labels := map[string]string{} - labels[agolaLabelKey] = agolaLabelValue - labels[podIDKey] = podConfig.ID - labels[taskIDKey] = podConfig.TaskID - - containerLabels := map[string]string{} - for k, v := range labels { - containerLabels[k] = v - } - containerLabels[containerIndexKey] = "0" - - resp, err := d.client.ContainerCreate(ctx, &container.Config{ - Entrypoint: containerConfig.Cmd, - Env: makeEnvSlice(containerConfig.Env), - WorkingDir: containerConfig.WorkingDir, - Image: containerConfig.Image, - Tty: true, - Labels: containerLabels, - }, &container.HostConfig{ - Binds: []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)}, - ReadonlyPaths: []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)}, - Privileged: containerConfig.Privileged, - }, nil, "") - if err != nil { - return nil, err - } - - containerID := resp.ID - - if err := d.client.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil { - return nil, err - } + searchLabels := map[string]string{} + searchLabels[agolaLabelKey] = agolaLabelValue + searchLabels[podIDKey] = podConfig.ID + searchLabels[taskIDKey] = podConfig.TaskID args := filters.NewArgs() - for k, v := range labels { + for k, v := range searchLabels { args.Add("label", fmt.Sprintf("%s=%s", k, v)) } @@ -211,15 +174,114 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io. return nil, err } if len(containers) == 0 { - return nil, errors.Errorf("no container with id %s", containerID) + return nil, errors.Errorf("no container with labels %s", searchLabels) } - return &DockerPod{ + pod := &DockerPod{ id: podConfig.ID, client: d.client, - containers: containers, executorID: d.executorID, - }, nil + containers: make([]types.Container, len(containers)), + } + + // Put the containers in the right order based on their containerIndexKey label value + count := 0 + seenIndexes := map[int]struct{}{} + for _, container := range containers { + cIndexStr, ok := container.Labels[containerIndexKey] + if !ok { + // ignore container + continue + } + cIndex, err := strconv.Atoi(cIndexStr) + if err != nil { + // ignore container + continue + } + if _, ok := seenIndexes[cIndex]; ok { + return nil, errors.Errorf("duplicate container with index %d", cIndex) + } + pod.containers[cIndex] = container + + seenIndexes[cIndex] = struct{}{} + count++ + } + if count != len(containers) { + return nil, errors.Errorf("expected %d containers but got %d", len(containers), count) + } + + return pod, nil +} + +func (d *DockerDriver) fetchImage(ctx context.Context, image string, registryConfig *registry.DockerConfig, out io.Writer) error { + regName, err := registry.GetRegistry(image) + if err != nil { + return err + } + var registryAuth registry.DockerConfigAuth + if registryConfig != nil { + if regauth, ok := registryConfig.Auths[regName]; ok { + registryAuth = regauth + } + } + buf, err := json.Marshal(registryAuth) + if err != nil { + return err + } + registryAuthEnc := base64.URLEncoding.EncodeToString(buf) + + // by default always try to pull the image so we are sure only authorized users can fetch them + // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages + reader, err := d.client.ImagePull(ctx, image, types.ImagePullOptions{RegistryAuth: registryAuthEnc}) + if err != nil { + return err + } + + _, err = io.Copy(out, reader) + return err +} + +func (d *DockerDriver) createContainer(ctx context.Context, index int, podConfig *PodConfig, maincontainerID string, out io.Writer) (*container.ContainerCreateCreatedBody, error) { + containerConfig := podConfig.Containers[index] + + if err := d.fetchImage(ctx, containerConfig.Image, podConfig.DockerConfig, out); err != nil { + return nil, err + } + + labels := map[string]string{} + labels[agolaLabelKey] = agolaLabelValue + labels[podIDKey] = podConfig.ID + labels[taskIDKey] = podConfig.TaskID + + containerLabels := map[string]string{} + for k, v := range labels { + containerLabels[k] = v + } + containerLabels[containerIndexKey] = strconv.Itoa(index) + + cliContainerConfig := &container.Config{ + Entrypoint: containerConfig.Cmd, + Env: makeEnvSlice(containerConfig.Env), + WorkingDir: containerConfig.WorkingDir, + Image: containerConfig.Image, + Tty: true, + Labels: containerLabels, + } + + cliHostConfig := &container.HostConfig{ + Privileged: containerConfig.Privileged, + } + if index == 0 { + // main container requires the initvolume containing the toolbox + cliHostConfig.Binds = []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)} + cliHostConfig.ReadonlyPaths = []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)} + } else { + // attach other containers to maincontainer network + cliHostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:%s", maincontainerID)) + } + + resp, err := d.client.ContainerCreate(ctx, cliContainerConfig, cliHostConfig, nil, "") + return &resp, err } func (d *DockerDriver) ExecutorGroup(ctx context.Context) (string, error) { diff --git a/internal/services/executor/driver/docker_test.go b/internal/services/executor/driver/docker_test.go index 20159b0..dadfd15 100644 --- a/internal/services/executor/driver/docker_test.go +++ b/internal/services/executor/driver/docker_test.go @@ -24,6 +24,7 @@ import ( "os" "strings" "testing" + "time" "unicode" uuid "github.com/satori/go.uuid" @@ -203,7 +204,67 @@ func TestDockerPod(t *testing.T) { } }) - t.Run("test get pods", func(t *testing.T) { + t.Run("create a pod with two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + }) + + t.Run("test communication between two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + // wait for nginx up + time.Sleep(1 * time.Second) + + ce, err := pod.Exec(ctx, &ExecConfig{ + Cmd: []string{"nc", "-z", "localhost", "80"}, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + code, err := ce.Wait(ctx) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if code != 0 { + t.Fatalf("unexpected exit code: %d", code) + } + }) + + t.Run("test get pods single container", func(t *testing.T) { pod, err := d.NewPod(ctx, &PodConfig{ ID: uuid.NewV4().String(), TaskID: uuid.NewV4().String(), @@ -246,4 +307,49 @@ func TestDockerPod(t *testing.T) { } }) + t.Run("test get pods two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + pods, err := d.GetPods(ctx, true) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ok := false + for _, p := range pods { + if p.ID() == pod.ID() { + ok = true + ip := pod.(*DockerPod) + dp := p.(*DockerPod) + for i, c := range dp.containers { + if c.ID != ip.containers[i].ID { + t.Fatalf("different pod id, want: %s, got: %s", ip.id, dp.id) + } + if diff := cmp.Diff(ip.containers[i], c); diff != "" { + t.Error(diff) + } + } + } + } + if !ok { + t.Fatalf("pod with id %q not found", pod.ID()) + } + }) } diff --git a/internal/services/executor/driver/k8s.go b/internal/services/executor/driver/k8s.go index 6a55b45..f1c2326 100644 --- a/internal/services/executor/driver/k8s.go +++ b/internal/services/executor/driver/k8s.go @@ -292,8 +292,6 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri return nil, errors.Errorf("empty container config") } - containerConfig := podConfig.Containers[0] - secretClient := d.client.CoreV1().Secrets(d.namespace) podClient := d.client.CoreV1().Pods(d.namespace) @@ -355,30 +353,7 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri }, }, }, - Containers: []corev1.Container{ - { - Name: mainContainerName, - Image: containerConfig.Image, - Command: containerConfig.Cmd[0:1], - Args: containerConfig.Cmd[1:], - Env: genEnvVars(containerConfig.Env), - Stdin: true, - WorkingDir: containerConfig.WorkingDir, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "agolavolume", - MountPath: podConfig.InitVolumeDir, - ReadOnly: true, - }, - }, - // by default always try to pull the image so we are sure only authorized users can fetch them - // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages - ImagePullPolicy: corev1.PullAlways, - SecurityContext: &corev1.SecurityContext{ - Privileged: &containerConfig.Privileged, - }, - }, - }, + Containers: []corev1.Container{}, Volumes: []corev1.Volume{ { Name: "agolavolume", @@ -390,6 +365,41 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri }, } + // define containers + for cIndex, containerConfig := range podConfig.Containers { + var containerName string + if cIndex == 0 { + containerName = mainContainerName + } else { + containerName = fmt.Sprintf("service%d", cIndex) + } + c := corev1.Container{ + Name: containerName, + Image: containerConfig.Image, + Command: containerConfig.Cmd, + Env: genEnvVars(containerConfig.Env), + Stdin: true, + WorkingDir: containerConfig.WorkingDir, + // by default always try to pull the image so we are sure only authorized users can fetch them + // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages + ImagePullPolicy: corev1.PullAlways, + SecurityContext: &corev1.SecurityContext{ + Privileged: &containerConfig.Privileged, + }, + } + if cIndex == 0 { + // main container requires the initvolume containing the toolbox + c.VolumeMounts = []corev1.VolumeMount{ + { + Name: "agolavolume", + MountPath: podConfig.InitVolumeDir, + ReadOnly: true, + }, + } + } + pod.Spec.Containers = append(pod.Spec.Containers, c) + } + if podConfig.Arch != "" { pod.Spec.NodeSelector = map[string]string{ corev1.LabelArchStable: string(podConfig.Arch), diff --git a/internal/services/executor/driver/k8s_test.go b/internal/services/executor/driver/k8s_test.go index 260da68..ae568cc 100644 --- a/internal/services/executor/driver/k8s_test.go +++ b/internal/services/executor/driver/k8s_test.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "os" "testing" + "time" uuid "github.com/satori/go.uuid" ) @@ -156,6 +157,68 @@ func TestK8sPod(t *testing.T) { } }) + t.Run("create a pod with two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + }) + + t.Run("test communication between two containers", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + &ContainerConfig{ + Image: "nginx:1.16", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + // wait for nginx up + time.Sleep(1 * time.Second) + + var buf bytes.Buffer + ce, err := pod.Exec(ctx, &ExecConfig{ + Cmd: []string{"nc", "-z", "localhost", "80"}, + Stdout: &buf, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + code, err := ce.Wait(ctx) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if code != 0 { + t.Fatalf("unexpected exit code: %d", code) + } + }) + t.Run("test get pods", func(t *testing.T) { pod, err := d.NewPod(ctx, &PodConfig{ ID: uuid.NewV4().String(), diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index bc331f2..e7ac219 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -822,12 +822,6 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { return err } - cmd := []string{toolboxContainerPath, "sleeper"} - if et.Containers[0].Entrypoint != "" { - cmd = strings.Split(et.Containers[0].Entrypoint, " ") - log.Infof("cmd: %v", cmd) - } - log.Debugf("starting pod") dockerConfig, err := registry.GenDockerConfig(et.DockerRegistriesAuth, []string{et.Containers[0].Image}) @@ -843,15 +837,24 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { Arch: et.Arch, InitVolumeDir: toolboxContainerDir, DockerConfig: dockerConfig, - Containers: []*driver.ContainerConfig{ - { - Image: et.Containers[0].Image, - Cmd: cmd, - Env: et.Containers[0].Environment, - User: et.Containers[0].User, - Privileged: et.Containers[0].Privileged, - }, - }, + Containers: make([]*driver.ContainerConfig, len(et.Containers)), + } + for i, c := range et.Containers { + var cmd []string + if i == 0 { + cmd = []string{toolboxContainerPath, "sleeper"} + } + if c.Entrypoint != "" { + cmd = strings.Split(c.Entrypoint, " ") + } + + podConfig.Containers[i] = &driver.ContainerConfig{ + Image: c.Image, + Cmd: cmd, + Env: c.Environment, + User: c.User, + Privileged: c.Privileged, + } } setupLogPath := e.setupLogPath(et.ID)