// Copyright 2019 Sorint.lab // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied // See the License for the specific language governing permissions and // limitations under the License. package driver import ( "context" "encoding/json" "fmt" "io" "path/filepath" "strings" "time" "github.com/sorintlab/agola/internal/util" "github.com/docker/docker/client" "github.com/docker/docker/pkg/archive" "github.com/pkg/errors" uuid "github.com/satori/go.uuid" "github.com/sorintlab/agola/internal/common" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apilabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/remotecommand" utilexec "k8s.io/utils/exec" ) const ( mainContainerName = "maincontainer" configMapName = "agola-executors-group" executorLeasePrefix = "agola-executor-" podNamePrefix = "agola-task-" executorsGroupIDKey = labelPrefix + "executorsgroupid" executorsGroupIDConfigMapKey = "executorsgroupid" useLeaseAPIKey = labelPrefix + "useleaseapi" cmLeaseKey = labelPrefix + "lease" renewExecutorLeaseInterval = 10 * time.Second staleExecutorLeaseInterval = 1 * time.Minute ) type K8sDriver struct { log *zap.SugaredLogger restconfig *restclient.Config client *kubernetes.Clientset toolboxPath string namespace string executorID string executorsGroupID string useLeaseAPI bool } type K8sPod struct { id string namespace string labels map[string]string restconfig *restclient.Config client *kubernetes.Clientset initVolumeDir string } func NewK8sDriver(logger *zap.Logger, executorID, toolboxPath string) (*K8sDriver, error) { kubeClientConfig := NewKubeClientConfig("", "", "") kubecfg, err := kubeClientConfig.ClientConfig() if err != nil { return nil, err } kubecli, err := kubernetes.NewForConfig(kubecfg) if err != nil { return nil, fmt.Errorf("cannot create kubernetes client: %v", err) } namespace, _, err := kubeClientConfig.Namespace() if err != nil { return nil, err } d := &K8sDriver{ log: logger.Sugar(), restconfig: kubecfg, client: kubecli, toolboxPath: toolboxPath, namespace: namespace, executorID: executorID, } lists, err := d.client.Discovery().ServerPreferredResources() if err != nil { return nil, err } hasLeaseAPI := false for _, list := range lists { if len(list.APIResources) == 0 { continue } if list.GroupVersion != "coordination.k8s.io/v1" { continue } for _, apiResource := range list.APIResources { if apiResource.Kind == "Lease" { hasLeaseAPI = true } } } d.useLeaseAPI = hasLeaseAPI executorsGroupID, err := d.getOrCreateExecutorsGroupID(context.TODO()) if err != nil { return nil, err } d.executorsGroupID = executorsGroupID ctx := context.TODO() go func() { for { if err := d.updateLease(ctx); err != nil { d.log.Errorf("failed to update executor lease: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(renewExecutorLeaseInterval) } }() go func() { for { if err := d.cleanStaleExecutorsLease(ctx); err != nil { d.log.Errorf("failed to clean stale executors lease: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(renewExecutorLeaseInterval) } }() return d, nil } // NewKubeClientConfig return a kube client config that will by default use an // in cluster client config or, if not available or overriden an external client // config using the default client behavior used also by kubectl. func NewKubeClientConfig(kubeconfigPath, context, namespace string) clientcmd.ClientConfig { rules := clientcmd.NewDefaultClientConfigLoadingRules() rules.DefaultClientConfig = &clientcmd.DefaultClientConfig if kubeconfigPath != "" { rules.ExplicitPath = kubeconfigPath } overrides := &clientcmd.ConfigOverrides{ClusterDefaults: clientcmd.ClusterDefaults} if context != "" { overrides.CurrentContext = context } if namespace != "" { overrides.Context.Namespace = namespace } return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides) } func (d *K8sDriver) Setup(ctx context.Context) error { return nil } func (d *K8sDriver) Archs(ctx context.Context) ([]common.Arch, error) { // TODO(sgotti) use go client listers instead of querying every time nodeClient := d.client.CoreV1().Nodes() nodes, err := nodeClient.List(metav1.ListOptions{}) if err != nil { return nil, err } archsMap := map[common.Arch]struct{}{} archs := []common.Arch{} for _, node := range nodes.Items { archsMap[common.ArchFromString(node.Status.NodeInfo.Architecture)] = struct{}{} } for arch := range archsMap { archs = append(archs, arch) } return archs, nil } func (d *K8sDriver) ExecutorGroup(ctx context.Context) (string, error) { return d.executorsGroupID, nil } func (d *K8sDriver) GetExecutors(ctx context.Context) ([]string, error) { return d.getLeases((ctx)) } // executorsGroups gets or creates (if it doesn't exists) a configmap under // the k8s namespace where the executorsgroup id is saved. The executorsgroupid // is unique per k8s namespace and is shared by all the executors accessing this // namespace func (d *K8sDriver) getOrCreateExecutorsGroupID(ctx context.Context) (string, error) { cmClient := d.client.CoreV1().ConfigMaps(d.namespace) // pod and secret name, based on pod id cm, err := cmClient.Get(configMapName, metav1.GetOptions{}) if err != nil { if !apierrors.IsNotFound(err) { return "", err } } else { return cm.Data[executorsGroupIDConfigMapKey], nil } executorsGroupID := uuid.NewV4().String() cm = &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: configMapName, }, Data: map[string]string{executorsGroupIDConfigMapKey: executorsGroupID}, } cm, err = cmClient.Create(cm) if err != nil { return "", err } return executorsGroupID, nil } func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error) { if len(podConfig.Containers) == 0 { return nil, errors.Errorf("empty container config") } containerConfig := podConfig.Containers[0] secretClient := d.client.CoreV1().Secrets(d.namespace) podClient := d.client.CoreV1().Pods(d.namespace) labels := map[string]string{} labels[agolaLabelKey] = agolaLabelValue labels[podIDKey] = podConfig.ID labels[taskIDKey] = podConfig.TaskID labels[executorIDKey] = d.executorID labels[executorsGroupIDKey] = d.executorsGroupID dockerconfigj, err := json.Marshal(podConfig.DockerConfig) if err != nil { return nil, err } // pod and secret name, based on pod id name := podNamePrefix + podConfig.ID // secret that hold the docker registry auth secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, Labels: labels, }, Data: map[string][]byte{ ".dockerconfigjson": dockerconfigj, }, Type: corev1.SecretTypeDockerConfigJson, } secret, err = secretClient.Create(secret) if err != nil { return nil, err } pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: d.namespace, Name: name, Labels: labels, }, Spec: corev1.PodSpec{ ImagePullSecrets: []corev1.LocalObjectReference{{Name: name}}, // don't mount service account secrets or pods will be able to talk with k8s // api AutomountServiceAccountToken: util.BoolP(false), InitContainers: []corev1.Container{ { Name: "initcontainer", Image: "busybox", // wait for a file named /tmp/done and then exit Command: []string{"/bin/sh", "-c", "while true; do if [[ -f /tmp/done ]]; then exit; fi; sleep 1; done"}, Stdin: true, VolumeMounts: []corev1.VolumeMount{ { Name: "agolavolume", MountPath: podConfig.InitVolumeDir, }, }, }, }, Containers: []corev1.Container{ { Name: mainContainerName, Image: containerConfig.Image, Command: containerConfig.Cmd[0:1], Args: containerConfig.Cmd[1:], Env: genEnvVars(containerConfig.Env), Stdin: true, WorkingDir: containerConfig.WorkingDir, VolumeMounts: []corev1.VolumeMount{ { Name: "agolavolume", MountPath: podConfig.InitVolumeDir, ReadOnly: true, }, }, // by default always try to pull the image so we are sure only authorized users can fetch them // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages ImagePullPolicy: corev1.PullAlways, SecurityContext: &corev1.SecurityContext{ Privileged: &containerConfig.Privileged, }, }, }, Volumes: []corev1.Volume{ { Name: "agolavolume", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, }, }, } pod, err = podClient.Create(pod) if err != nil { return nil, err } watcher, err := podClient.Watch( metav1.SingleObject(pod.ObjectMeta), ) if err != nil { return nil, err } // wait for init container to be ready for event := range watcher.ResultChan() { switch event.Type { case watch.Modified: pod := event.Object.(*corev1.Pod) if len(pod.Status.InitContainerStatuses) > 0 { if pod.Status.InitContainerStatuses[0].State.Running != nil { watcher.Stop() } } case watch.Deleted: return nil, errors.Errorf("pod %q has been deleted", pod.Name) } } fmt.Fprintf(out, "init container ready\n") srcInfo, err := archive.CopyInfoSourcePath(d.toolboxPath, false) if err != nil { return nil, err } srcArchive, err := archive.TarResource(srcInfo) if err != nil { return nil, err } defer srcArchive.Close() coreclient, err := corev1client.NewForConfig(d.restconfig) if err != nil { return nil, err } req := coreclient.RESTClient(). Post(). Namespace(pod.Namespace). Resource("pods"). Name(pod.Name). SubResource("exec"). VersionedParams(&corev1.PodExecOptions{ Container: "initcontainer", Command: []string{"tar", "xf", "-", "-C", podConfig.InitVolumeDir}, Stdin: true, Stdout: true, Stderr: true, TTY: false, }, scheme.ParameterCodec) exec, err := remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL()) if err != nil { return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL()) } err = exec.Stream(remotecommand.StreamOptions{ Stdin: srcArchive, Stdout: out, Stderr: out, }) if err != nil { return nil, errors.Wrapf(err, "failed to execute command on initcontainer") } req = coreclient.RESTClient(). Post(). Namespace(pod.Namespace). Resource("pods"). Name(pod.Name). SubResource("exec"). VersionedParams(&corev1.PodExecOptions{ Container: "initcontainer", Command: []string{"touch", "/tmp/done"}, Stdout: true, Stderr: true, TTY: false, }, scheme.ParameterCodec) exec, err = remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL()) if err != nil { return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL()) } err = exec.Stream(remotecommand.StreamOptions{ Stdout: out, Stderr: out, }) if err != nil { return nil, errors.Wrapf(err, "failed to execute command on initcontainer") } watcher, err = podClient.Watch( metav1.SingleObject(pod.ObjectMeta), ) if err != nil { return nil, err } // wait for pod to be initialized for event := range watcher.ResultChan() { switch event.Type { case watch.Modified: pod := event.Object.(*corev1.Pod) if len(pod.Status.ContainerStatuses) > 0 { if pod.Status.ContainerStatuses[0].State.Running != nil { watcher.Stop() } } case watch.Deleted: return nil, errors.Errorf("pod %q has been deleted", pod.Name) } } return &K8sPod{ id: pod.Name, namespace: pod.Namespace, restconfig: d.restconfig, client: d.client, initVolumeDir: podConfig.InitVolumeDir, }, nil } func (d *K8sDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) { podClient := d.client.CoreV1().Pods(d.namespace) // get all pods for the executor group, also the ones managed by other executors in the same executor group labels := map[string]string{executorsGroupIDKey: d.executorsGroupID} // TODO(sgotti) use go client listers instead of querying every time k8sPods, err := podClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) if err != nil { return nil, err } pods := make([]Pod, len(k8sPods.Items)) for i, k8sPod := range k8sPods.Items { labels := map[string]string{} // keep only labels starting with our prefix for n, v := range k8sPod.Labels { if strings.HasPrefix(n, labelPrefix) { labels[n] = v } } pods[i] = &K8sPod{ id: k8sPod.Name, namespace: k8sPod.Namespace, labels: labels, restconfig: d.restconfig, client: d.client, } } return pods, nil } func (p *K8sPod) ID() string { return p.id } func (p *K8sPod) ExecutorID() string { return p.labels[executorIDKey] } func (p *K8sPod) TaskID() string { return p.labels[taskIDKey] } func (p *K8sPod) Stop(ctx context.Context) error { d := int64(0) secretClient := p.client.CoreV1().Secrets(p.namespace) if err := secretClient.Delete(p.id, &metav1.DeleteOptions{GracePeriodSeconds: &d}); err != nil { return err } podClient := p.client.CoreV1().Pods(p.namespace) if err := podClient.Delete(p.id, &metav1.DeleteOptions{GracePeriodSeconds: &d}); err != nil { return err } return nil } func (p *K8sPod) Remove(ctx context.Context) error { return p.Stop(ctx) } type K8sContainerExec struct { execID string client *client.Client endCh chan error stdin io.WriteCloser } func (p *K8sPod) Exec(ctx context.Context, execConfig *ExecConfig) (ContainerExec, error) { endCh := make(chan error) coreclient, err := corev1client.NewForConfig(p.restconfig) if err != nil { return nil, err } // k8s pod exec api doesn't let us define the workingdir and the environment. // Use a toolbox command that will set them up and then exec the real command. envj, err := json.Marshal(execConfig.Env) if err != nil { return nil, err } cmd := []string{filepath.Join(p.initVolumeDir, "agola-toolbox"), "exec", "-e", string(envj), "-w", execConfig.WorkingDir, "--"} cmd = append(cmd, execConfig.Cmd...) req := coreclient.RESTClient(). Post(). Namespace(p.namespace). Resource("pods"). Name(p.id). SubResource("exec"). VersionedParams(&corev1.PodExecOptions{ Container: mainContainerName, Command: cmd, Stdin: true, Stdout: execConfig.Stdout != nil, Stderr: execConfig.Stderr != nil, TTY: execConfig.Tty, }, scheme.ParameterCodec) exec, err := remotecommand.NewSPDYExecutor(p.restconfig, "POST", req.URL()) if err != nil { return nil, err } reader, writer := io.Pipe() go func() { err := exec.Stream(remotecommand.StreamOptions{ Stdin: reader, Stdout: execConfig.Stdout, Stderr: execConfig.Stderr, Tty: execConfig.Tty, }) endCh <- err }() return &K8sContainerExec{ stdin: writer, endCh: endCh, }, nil } func (e *K8sContainerExec) Wait(ctx context.Context) (int, error) { err := <-e.endCh var exitCode int if err != nil { switch err := err.(type) { case utilexec.ExitError: exitCode = err.ExitStatus() default: return -1, err } } return exitCode, nil } func (e *K8sContainerExec) Stdin() io.WriteCloser { return e.stdin } func genEnvVars(env map[string]string) []corev1.EnvVar { envVars := make([]corev1.EnvVar, 0, len(env)) for n, v := range env { envVars = append(envVars, corev1.EnvVar{Name: n, Value: v}) } return envVars }