This was already defined in the config but not implemented in the executor and drivers. All the containers defined in the runtime after the first one will be "service" containers. They will share the same network namespace with the other containers in the "pod" so they can communicate between themself on loopback
744 lines
19 KiB
744 lines
19 KiB
// Copyright 2019 Sorint.lab
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package driver
import (
uuid ""
corev1 ""
apierrors ""
metav1 ""
apilabels ""
corev1client ""
coordinationlistersv1 ""
listerscorev1 ""
restclient ""
utilexec ""
const (
mainContainerName = "maincontainer"
configMapName = "agola-executors-group"
executorLeasePrefix = "agola-executor-"
podNamePrefix = "agola-task-"
executorsGroupIDKey = labelPrefix + "executorsgroupid"
executorsGroupIDConfigMapKey = "executorsgroupid"
useLeaseAPIKey = labelPrefix + "useleaseapi"
cmLeaseKey = labelPrefix + "lease"
renewExecutorLeaseInterval = 10 * time.Second
staleExecutorLeaseInterval = 1 * time.Minute
informerResyncInterval = 10 * time.Second
type K8sDriver struct {
log *zap.SugaredLogger
restconfig *restclient.Config
client *kubernetes.Clientset
toolboxPath string
namespace string
executorID string
executorsGroupID string
useLeaseAPI bool
nodeLister listerscorev1.NodeLister
podLister listerscorev1.PodLister
cmLister listerscorev1.ConfigMapLister
leaseLister coordinationlistersv1.LeaseLister
type K8sPod struct {
id string
namespace string
labels map[string]string
restconfig *restclient.Config
client *kubernetes.Clientset
initVolumeDir string
func NewK8sDriver(logger *zap.Logger, executorID, toolboxPath string) (*K8sDriver, error) {
kubeClientConfig := NewKubeClientConfig("", "", "")
kubecfg, err := kubeClientConfig.ClientConfig()
if err != nil {
return nil, err
kubecli, err := kubernetes.NewForConfig(kubecfg)
if err != nil {
return nil, fmt.Errorf("cannot create kubernetes client: %v", err)
namespace, _, err := kubeClientConfig.Namespace()
if err != nil {
return nil, err
d := &K8sDriver{
log: logger.Sugar(),
restconfig: kubecfg,
client: kubecli,
toolboxPath: toolboxPath,
namespace: namespace,
executorID: executorID,
lists, err := d.client.Discovery().ServerPreferredResources()
if err != nil {
return nil, err
hasLeaseAPI := false
for _, list := range lists {
if len(list.APIResources) == 0 {
if list.GroupVersion != "" {
for _, apiResource := range list.APIResources {
if apiResource.Kind == "Lease" {
hasLeaseAPI = true
d.useLeaseAPI = hasLeaseAPI
executorsGroupID, err := d.getOrCreateExecutorsGroupID(context.TODO())
if err != nil {
return nil, err
d.executorsGroupID = executorsGroupID
ctx := context.TODO()
go func() {
for {
if err := d.updateLease(ctx); err != nil {
d.log.Errorf("failed to update executor lease: %+v", err)
select {
case <-ctx.Done():
go func() {
for {
if err := d.cleanStaleExecutorsLease(ctx); err != nil {
d.log.Errorf("failed to clean stale executors lease: %+v", err)
select {
case <-ctx.Done():
factory := informers.NewSharedInformerFactoryWithOptions(d.client, informerResyncInterval, informers.WithNamespace(d.namespace))
nodeInformer := factory.Core().V1().Nodes()
d.nodeLister = nodeInformer.Lister()
go nodeInformer.Informer().Run(ctx.Done())
podInformer := factory.Core().V1().Pods()
d.podLister = podInformer.Lister()
go podInformer.Informer().Run(ctx.Done())
if d.useLeaseAPI {
leaseInformer := factory.Coordination().V1().Leases()
d.leaseLister = leaseInformer.Lister()
go leaseInformer.Informer().Run(ctx.Done())
} else {
cmInformer := factory.Core().V1().ConfigMaps()
d.cmLister = cmInformer.Lister()
go cmInformer.Informer().Run(ctx.Done())
return d, nil
// NewKubeClientConfig return a kube client config that will by default use an
// in cluster client config or, if not available or overriden an external client
// config using the default client behavior used also by kubectl.
func NewKubeClientConfig(kubeconfigPath, context, namespace string) clientcmd.ClientConfig {
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.DefaultClientConfig = &clientcmd.DefaultClientConfig
if kubeconfigPath != "" {
rules.ExplicitPath = kubeconfigPath
overrides := &clientcmd.ConfigOverrides{ClusterDefaults: clientcmd.ClusterDefaults}
if context != "" {
overrides.CurrentContext = context
if namespace != "" {
overrides.Context.Namespace = namespace
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
func (d *K8sDriver) Setup(ctx context.Context) error {
return nil
func (d *K8sDriver) Archs(ctx context.Context) ([]common.Arch, error) {
// TODO(sgotti) use go client listers instead of querying every time
nodes, err := d.nodeLister.List(apilabels.SelectorFromSet(nil))
if err != nil {
return nil, err
archsMap := map[common.Arch]struct{}{}
archs := []common.Arch{}
for _, node := range nodes {
archsMap[common.ArchFromString(node.Status.NodeInfo.Architecture)] = struct{}{}
for arch := range archsMap {
archs = append(archs, arch)
return archs, nil
func (d *K8sDriver) ExecutorGroup(ctx context.Context) (string, error) {
return d.executorsGroupID, nil
func (d *K8sDriver) GetExecutors(ctx context.Context) ([]string, error) {
return d.getLeases((ctx))
// executorsGroups gets or creates (if it doesn't exists) a configmap under
// the k8s namespace where the executorsgroup id is saved. The executorsgroupid
// is unique per k8s namespace and is shared by all the executors accessing this
// namespace
func (d *K8sDriver) getOrCreateExecutorsGroupID(ctx context.Context) (string, error) {
cmClient := d.client.CoreV1().ConfigMaps(d.namespace)
// pod and secret name, based on pod id
cm, err := cmClient.Get(configMapName, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return "", err
} else {
return cm.Data[executorsGroupIDConfigMapKey], nil
executorsGroupID := uuid.NewV4().String()
cm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Data: map[string]string{executorsGroupIDConfigMapKey: executorsGroupID},
cm, err = cmClient.Create(cm)
if err != nil {
return "", err
return executorsGroupID, nil
func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error) {
if len(podConfig.Containers) == 0 {
return nil, errors.Errorf("empty container config")
secretClient := d.client.CoreV1().Secrets(d.namespace)
podClient := d.client.CoreV1().Pods(d.namespace)
labels := map[string]string{}
labels[agolaLabelKey] = agolaLabelValue
labels[podIDKey] = podConfig.ID
labels[taskIDKey] = podConfig.TaskID
labels[executorIDKey] = d.executorID
labels[executorsGroupIDKey] = d.executorsGroupID
dockerconfigj, err := json.Marshal(podConfig.DockerConfig)
if err != nil {
return nil, err
// pod and secret name, based on pod id
name := podNamePrefix + podConfig.ID
// secret that hold the docker registry auth
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
Data: map[string][]byte{
".dockerconfigjson": dockerconfigj,
Type: corev1.SecretTypeDockerConfigJson,
secret, err = secretClient.Create(secret)
if err != nil {
return nil, err
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: d.namespace,
Name: name,
Labels: labels,
Spec: corev1.PodSpec{
ImagePullSecrets: []corev1.LocalObjectReference{{Name: name}},
// don't mount service account secrets or pods will be able to talk with k8s
// api
AutomountServiceAccountToken: util.BoolP(false),
InitContainers: []corev1.Container{
Name: "initcontainer",
Image: "busybox",
// wait for a file named /tmp/done and then exit
Command: []string{"/bin/sh", "-c", "while true; do if [[ -f /tmp/done ]]; then exit; fi; sleep 1; done"},
Stdin: true,
VolumeMounts: []corev1.VolumeMount{
Name: "agolavolume",
MountPath: podConfig.InitVolumeDir,
Containers: []corev1.Container{},
Volumes: []corev1.Volume{
Name: "agolavolume",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
// define containers
for cIndex, containerConfig := range podConfig.Containers {
var containerName string
if cIndex == 0 {
containerName = mainContainerName
} else {
containerName = fmt.Sprintf("service%d", cIndex)
c := corev1.Container{
Name: containerName,
Image: containerConfig.Image,
Command: containerConfig.Cmd,
Env: genEnvVars(containerConfig.Env),
Stdin: true,
WorkingDir: containerConfig.WorkingDir,
// by default always try to pull the image so we are sure only authorized users can fetch them
// see
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{
Privileged: &containerConfig.Privileged,
if cIndex == 0 {
// main container requires the initvolume containing the toolbox
c.VolumeMounts = []corev1.VolumeMount{
Name: "agolavolume",
MountPath: podConfig.InitVolumeDir,
ReadOnly: true,
pod.Spec.Containers = append(pod.Spec.Containers, c)
if podConfig.Arch != "" {
pod.Spec.NodeSelector = map[string]string{
corev1.LabelArchStable: string(podConfig.Arch),
pod, err = podClient.Create(pod)
if err != nil {
return nil, err
watcher, err := podClient.Watch(
if err != nil {
return nil, err
// wait for init container to be ready
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Modified:
pod := event.Object.(*corev1.Pod)
if len(pod.Status.InitContainerStatuses) > 0 {
if pod.Status.InitContainerStatuses[0].State.Running != nil {
case watch.Deleted:
return nil, errors.Errorf("pod %q has been deleted", pod.Name)
fmt.Fprintf(out, "init container ready\n")
coreclient, err := corev1client.NewForConfig(d.restconfig)
if err != nil {
return nil, err
// get the pod arch
req := coreclient.RESTClient().
Container: "initcontainer",
Command: []string{"uname", "-m"},
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL())
if err != nil {
return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL())
stdout := bytes.Buffer{}
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: out,
if err != nil {
return nil, errors.Wrapf(err, "failed to execute command on initcontainer")
osArch := strings.TrimSpace(stdout.String())
var arch common.Arch
switch osArch {
case "x86_64":
arch = common.ArchAMD64
case "aarch64":
arch = common.ArchARM64
return nil, errors.Errorf("unsupported pod arch %q", osArch)
// copy the toolbox for the pod arch
toolboxExecPath, err := toolboxExecPath(d.toolboxPath, arch)
if err != nil {
return nil, errors.Wrapf(err, "failed to get toolbox path for arch %q", arch)
srcInfo, err := archive.CopyInfoSourcePath(toolboxExecPath, false)
if err != nil {
return nil, err
srcInfo.RebaseName = "agola-toolbox"
srcArchive, err := archive.TarResource(srcInfo)
if err != nil {
return nil, err
defer srcArchive.Close()
req = coreclient.RESTClient().
Container: "initcontainer",
Command: []string{"tar", "xf", "-", "-C", podConfig.InitVolumeDir},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err = remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL())
if err != nil {
return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL())
fmt.Fprintf(out, "extracting toolbox\n")
err = exec.Stream(remotecommand.StreamOptions{
Stdin: srcArchive,
Stdout: out,
Stderr: out,
if err != nil {
return nil, errors.Wrapf(err, "failed to execute command on initcontainer")
fmt.Fprintf(out, "extracting toolbox done\n")
req = coreclient.RESTClient().
Container: "initcontainer",
Command: []string{"touch", "/tmp/done"},
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err = remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL())
if err != nil {
return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL())
err = exec.Stream(remotecommand.StreamOptions{
Stdout: out,
Stderr: out,
if err != nil {
return nil, errors.Wrapf(err, "failed to execute command on initcontainer")
watcher, err = podClient.Watch(
if err != nil {
return nil, err
// wait for pod to be initialized
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Modified:
pod := event.Object.(*corev1.Pod)
if len(pod.Status.ContainerStatuses) > 0 {
if pod.Status.ContainerStatuses[0].State.Running != nil {
case watch.Deleted:
return nil, errors.Errorf("pod %q has been deleted", pod.Name)
return &K8sPod{
id: pod.Name,
namespace: pod.Namespace,
restconfig: d.restconfig,
client: d.client,
initVolumeDir: podConfig.InitVolumeDir,
}, nil
func (d *K8sDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) {
// get all pods for the executor group, also the ones managed by other executors in the same executor group
labels := map[string]string{executorsGroupIDKey: d.executorsGroupID}
k8sPods, err := d.podLister.List(apilabels.SelectorFromSet(labels))
if err != nil {
return nil, err
pods := make([]Pod, len(k8sPods))
for i, k8sPod := range k8sPods {
labels := map[string]string{}
// keep only labels starting with our prefix
for n, v := range k8sPod.Labels {
if strings.HasPrefix(n, labelPrefix) {
labels[n] = v
pods[i] = &K8sPod{
id: k8sPod.Name,
namespace: k8sPod.Namespace,
labels: labels,
restconfig: d.restconfig,
client: d.client,
return pods, nil
func (p *K8sPod) ID() string {
func (p *K8sPod) ExecutorID() string {
return p.labels[executorIDKey]
func (p *K8sPod) TaskID() string {
return p.labels[taskIDKey]
func (p *K8sPod) Stop(ctx context.Context) error {
d := int64(0)
secretClient := p.client.CoreV1().Secrets(p.namespace)
if err := secretClient.Delete(, &metav1.DeleteOptions{GracePeriodSeconds: &d}); err != nil {
return err
podClient := p.client.CoreV1().Pods(p.namespace)
if err := podClient.Delete(, &metav1.DeleteOptions{GracePeriodSeconds: &d}); err != nil {
return err
return nil
func (p *K8sPod) Remove(ctx context.Context) error {
return p.Stop(ctx)
type K8sContainerExec struct {
execID string
client *client.Client
endCh chan error
stdin io.WriteCloser
func (p *K8sPod) Exec(ctx context.Context, execConfig *ExecConfig) (ContainerExec, error) {
endCh := make(chan error)
coreclient, err := corev1client.NewForConfig(p.restconfig)
if err != nil {
return nil, err
// k8s pod exec api doesn't let us define the workingdir and the environment.
// Use a toolbox command that will set them up and then exec the real command.
envj, err := json.Marshal(execConfig.Env)
if err != nil {
return nil, err
cmd := []string{filepath.Join(p.initVolumeDir, "agola-toolbox"), "exec", "-e", string(envj), "-w", execConfig.WorkingDir, "--"}
cmd = append(cmd, execConfig.Cmd...)
req := coreclient.RESTClient().
Container: mainContainerName,
Command: cmd,
Stdin: execConfig.AttachStdin,
Stdout: execConfig.Stdout != nil,
Stderr: execConfig.Stderr != nil,
TTY: execConfig.Tty,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(p.restconfig, "POST", req.URL())
if err != nil {
return nil, err
reader, writer := io.Pipe()
var stdin io.Reader
if execConfig.AttachStdin {
stdin = reader
go func() {
err := exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: execConfig.Stdout,
Stderr: execConfig.Stderr,
Tty: execConfig.Tty,
endCh <- err
return &K8sContainerExec{
stdin: writer,
endCh: endCh,
}, nil
func (e *K8sContainerExec) Wait(ctx context.Context) (int, error) {
err := <-e.endCh
var exitCode int
if err != nil {
switch err := err.(type) {
case utilexec.ExitError:
exitCode = err.ExitStatus()
return -1, err
return exitCode, nil
func (e *K8sContainerExec) Stdin() io.WriteCloser {
return e.stdin
func genEnvVars(env map[string]string) []corev1.EnvVar {
envVars := make([]corev1.EnvVar, 0, len(env))
for n, v := range env {
envVars = append(envVars, corev1.EnvVar{Name: n, Value: v})
return envVars