runservice: implement additional service containers

This was already defined in the config but not implemented in the executor and
drivers.

All the containers defined in the runtime after the first one will be "service"
containers. They will share the same network namespace with the other containers
in the "pod" so they can communicate between themself on loopback
This commit is contained in:
Simone Gotti 2019-05-10 17:31:23 +02:00
parent 7adca3ea30
commit caa9d74b72
5 changed files with 346 additions and 102 deletions

View File

@ -139,67 +139,30 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.
return nil, errors.Errorf("empty container config")
}
containerConfig := podConfig.Containers[0]
var mainContainerID string
for cindex := range podConfig.Containers {
resp, err := d.createContainer(ctx, cindex, podConfig, mainContainerID, out)
if err != nil {
return nil, err
}
regName, err := registry.GetRegistry(containerConfig.Image)
if err != nil {
return nil, err
}
var registryAuth registry.DockerConfigAuth
if podConfig.DockerConfig != nil {
if regauth, ok := podConfig.DockerConfig.Auths[regName]; ok {
registryAuth = regauth
containerID := resp.ID
if cindex == 0 {
// save the maincontainerid
mainContainerID = containerID
}
if err := d.client.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
return nil, err
}
}
buf, err := json.Marshal(registryAuth)
if err != nil {
return nil, err
}
registryAuthEnc := base64.URLEncoding.EncodeToString(buf)
// by default always try to pull the image so we are sure only authorized users can fetch them
// see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages
reader, err := d.client.ImagePull(ctx, containerConfig.Image, types.ImagePullOptions{RegistryAuth: registryAuthEnc})
if err != nil {
return nil, err
}
io.Copy(out, reader)
labels := map[string]string{}
labels[agolaLabelKey] = agolaLabelValue
labels[podIDKey] = podConfig.ID
labels[taskIDKey] = podConfig.TaskID
containerLabels := map[string]string{}
for k, v := range labels {
containerLabels[k] = v
}
containerLabels[containerIndexKey] = "0"
resp, err := d.client.ContainerCreate(ctx, &container.Config{
Entrypoint: containerConfig.Cmd,
Env: makeEnvSlice(containerConfig.Env),
WorkingDir: containerConfig.WorkingDir,
Image: containerConfig.Image,
Tty: true,
Labels: containerLabels,
}, &container.HostConfig{
Binds: []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)},
ReadonlyPaths: []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)},
Privileged: containerConfig.Privileged,
}, nil, "")
if err != nil {
return nil, err
}
containerID := resp.ID
if err := d.client.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
return nil, err
}
searchLabels := map[string]string{}
searchLabels[agolaLabelKey] = agolaLabelValue
searchLabels[podIDKey] = podConfig.ID
searchLabels[taskIDKey] = podConfig.TaskID
args := filters.NewArgs()
for k, v := range labels {
for k, v := range searchLabels {
args.Add("label", fmt.Sprintf("%s=%s", k, v))
}
@ -211,15 +174,114 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.
return nil, err
}
if len(containers) == 0 {
return nil, errors.Errorf("no container with id %s", containerID)
return nil, errors.Errorf("no container with labels %s", searchLabels)
}
return &DockerPod{
pod := &DockerPod{
id: podConfig.ID,
client: d.client,
containers: containers,
executorID: d.executorID,
}, nil
containers: make([]types.Container, len(containers)),
}
// Put the containers in the right order based on their containerIndexKey label value
count := 0
seenIndexes := map[int]struct{}{}
for _, container := range containers {
cIndexStr, ok := container.Labels[containerIndexKey]
if !ok {
// ignore container
continue
}
cIndex, err := strconv.Atoi(cIndexStr)
if err != nil {
// ignore container
continue
}
if _, ok := seenIndexes[cIndex]; ok {
return nil, errors.Errorf("duplicate container with index %d", cIndex)
}
pod.containers[cIndex] = container
seenIndexes[cIndex] = struct{}{}
count++
}
if count != len(containers) {
return nil, errors.Errorf("expected %d containers but got %d", len(containers), count)
}
return pod, nil
}
func (d *DockerDriver) fetchImage(ctx context.Context, image string, registryConfig *registry.DockerConfig, out io.Writer) error {
regName, err := registry.GetRegistry(image)
if err != nil {
return err
}
var registryAuth registry.DockerConfigAuth
if registryConfig != nil {
if regauth, ok := registryConfig.Auths[regName]; ok {
registryAuth = regauth
}
}
buf, err := json.Marshal(registryAuth)
if err != nil {
return err
}
registryAuthEnc := base64.URLEncoding.EncodeToString(buf)
// by default always try to pull the image so we are sure only authorized users can fetch them
// see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages
reader, err := d.client.ImagePull(ctx, image, types.ImagePullOptions{RegistryAuth: registryAuthEnc})
if err != nil {
return err
}
_, err = io.Copy(out, reader)
return err
}
func (d *DockerDriver) createContainer(ctx context.Context, index int, podConfig *PodConfig, maincontainerID string, out io.Writer) (*container.ContainerCreateCreatedBody, error) {
containerConfig := podConfig.Containers[index]
if err := d.fetchImage(ctx, containerConfig.Image, podConfig.DockerConfig, out); err != nil {
return nil, err
}
labels := map[string]string{}
labels[agolaLabelKey] = agolaLabelValue
labels[podIDKey] = podConfig.ID
labels[taskIDKey] = podConfig.TaskID
containerLabels := map[string]string{}
for k, v := range labels {
containerLabels[k] = v
}
containerLabels[containerIndexKey] = strconv.Itoa(index)
cliContainerConfig := &container.Config{
Entrypoint: containerConfig.Cmd,
Env: makeEnvSlice(containerConfig.Env),
WorkingDir: containerConfig.WorkingDir,
Image: containerConfig.Image,
Tty: true,
Labels: containerLabels,
}
cliHostConfig := &container.HostConfig{
Privileged: containerConfig.Privileged,
}
if index == 0 {
// main container requires the initvolume containing the toolbox
cliHostConfig.Binds = []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)}
cliHostConfig.ReadonlyPaths = []string{fmt.Sprintf("%s:%s", d.initVolumeHostDir, podConfig.InitVolumeDir)}
} else {
// attach other containers to maincontainer network
cliHostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:%s", maincontainerID))
}
resp, err := d.client.ContainerCreate(ctx, cliContainerConfig, cliHostConfig, nil, "")
return &resp, err
}
func (d *DockerDriver) ExecutorGroup(ctx context.Context) (string, error) {

View File

@ -24,6 +24,7 @@ import (
"os"
"strings"
"testing"
"time"
"unicode"
uuid "github.com/satori/go.uuid"
@ -203,7 +204,67 @@ func TestDockerPod(t *testing.T) {
}
})
t.Run("test get pods", func(t *testing.T) {
t.Run("create a pod with two containers", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
},
&ContainerConfig{
Image: "nginx:1.16",
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
})
t.Run("test communication between two containers", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
},
&ContainerConfig{
Image: "nginx:1.16",
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
// wait for nginx up
time.Sleep(1 * time.Second)
ce, err := pod.Exec(ctx, &ExecConfig{
Cmd: []string{"nc", "-z", "localhost", "80"},
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
code, err := ce.Wait(ctx)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if code != 0 {
t.Fatalf("unexpected exit code: %d", code)
}
})
t.Run("test get pods single container", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
@ -246,4 +307,49 @@ func TestDockerPod(t *testing.T) {
}
})
t.Run("test get pods two containers", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
},
&ContainerConfig{
Image: "nginx:1.16",
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
pods, err := d.GetPods(ctx, true)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ok := false
for _, p := range pods {
if p.ID() == pod.ID() {
ok = true
ip := pod.(*DockerPod)
dp := p.(*DockerPod)
for i, c := range dp.containers {
if c.ID != ip.containers[i].ID {
t.Fatalf("different pod id, want: %s, got: %s", ip.id, dp.id)
}
if diff := cmp.Diff(ip.containers[i], c); diff != "" {
t.Error(diff)
}
}
}
}
if !ok {
t.Fatalf("pod with id %q not found", pod.ID())
}
})
}

View File

@ -292,8 +292,6 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri
return nil, errors.Errorf("empty container config")
}
containerConfig := podConfig.Containers[0]
secretClient := d.client.CoreV1().Secrets(d.namespace)
podClient := d.client.CoreV1().Pods(d.namespace)
@ -355,30 +353,7 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri
},
},
},
Containers: []corev1.Container{
{
Name: mainContainerName,
Image: containerConfig.Image,
Command: containerConfig.Cmd[0:1],
Args: containerConfig.Cmd[1:],
Env: genEnvVars(containerConfig.Env),
Stdin: true,
WorkingDir: containerConfig.WorkingDir,
VolumeMounts: []corev1.VolumeMount{
{
Name: "agolavolume",
MountPath: podConfig.InitVolumeDir,
ReadOnly: true,
},
},
// by default always try to pull the image so we are sure only authorized users can fetch them
// see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{
Privileged: &containerConfig.Privileged,
},
},
},
Containers: []corev1.Container{},
Volumes: []corev1.Volume{
{
Name: "agolavolume",
@ -390,6 +365,41 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri
},
}
// define containers
for cIndex, containerConfig := range podConfig.Containers {
var containerName string
if cIndex == 0 {
containerName = mainContainerName
} else {
containerName = fmt.Sprintf("service%d", cIndex)
}
c := corev1.Container{
Name: containerName,
Image: containerConfig.Image,
Command: containerConfig.Cmd,
Env: genEnvVars(containerConfig.Env),
Stdin: true,
WorkingDir: containerConfig.WorkingDir,
// by default always try to pull the image so we are sure only authorized users can fetch them
// see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{
Privileged: &containerConfig.Privileged,
},
}
if cIndex == 0 {
// main container requires the initvolume containing the toolbox
c.VolumeMounts = []corev1.VolumeMount{
{
Name: "agolavolume",
MountPath: podConfig.InitVolumeDir,
ReadOnly: true,
},
}
}
pod.Spec.Containers = append(pod.Spec.Containers, c)
}
if podConfig.Arch != "" {
pod.Spec.NodeSelector = map[string]string{
corev1.LabelArchStable: string(podConfig.Arch),

View File

@ -20,6 +20,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"
uuid "github.com/satori/go.uuid"
)
@ -156,6 +157,68 @@ func TestK8sPod(t *testing.T) {
}
})
t.Run("create a pod with two containers", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
},
&ContainerConfig{
Image: "nginx:1.16",
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
})
t.Run("test communication between two containers", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
},
&ContainerConfig{
Image: "nginx:1.16",
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
// wait for nginx up
time.Sleep(1 * time.Second)
var buf bytes.Buffer
ce, err := pod.Exec(ctx, &ExecConfig{
Cmd: []string{"nc", "-z", "localhost", "80"},
Stdout: &buf,
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
code, err := ce.Wait(ctx)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if code != 0 {
t.Fatalf("unexpected exit code: %d", code)
}
})
t.Run("test get pods", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),

View File

@ -822,12 +822,6 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
return err
}
cmd := []string{toolboxContainerPath, "sleeper"}
if et.Containers[0].Entrypoint != "" {
cmd = strings.Split(et.Containers[0].Entrypoint, " ")
log.Infof("cmd: %v", cmd)
}
log.Debugf("starting pod")
dockerConfig, err := registry.GenDockerConfig(et.DockerRegistriesAuth, []string{et.Containers[0].Image})
@ -843,15 +837,24 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
Arch: et.Arch,
InitVolumeDir: toolboxContainerDir,
DockerConfig: dockerConfig,
Containers: []*driver.ContainerConfig{
{
Image: et.Containers[0].Image,
Cmd: cmd,
Env: et.Containers[0].Environment,
User: et.Containers[0].User,
Privileged: et.Containers[0].Privileged,
},
},
Containers: make([]*driver.ContainerConfig, len(et.Containers)),
}
for i, c := range et.Containers {
var cmd []string
if i == 0 {
cmd = []string{toolboxContainerPath, "sleeper"}
}
if c.Entrypoint != "" {
cmd = strings.Split(c.Entrypoint, " ")
}
podConfig.Containers[i] = &driver.ContainerConfig{
Image: c.Image,
Cmd: cmd,
Env: c.Environment,
User: c.User,
Privileged: c.Privileged,
}
}
setupLogPath := e.setupLogPath(et.ID)