diff --git a/internal/services/executor/driver/docker.go b/internal/services/executor/driver/docker.go index 3639d22..edd12d8 100644 --- a/internal/services/executor/driver/docker.go +++ b/internal/services/executor/driver/docker.go @@ -51,7 +51,7 @@ type DockerDriver struct { } func NewDockerDriver(logger *zap.Logger, executorID, initVolumeHostDir, toolboxPath string) (*DockerDriver, error) { - cli, err := client.NewEnvClient() + cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { return nil, err } @@ -80,7 +80,9 @@ func (d *DockerDriver) CopyToolbox(ctx context.Context) error { if err != nil { return err } - io.Copy(os.Stdout, reader) + if _, err := io.Copy(os.Stdout, reader); err != nil { + return err + } resp, err := d.client.ContainerCreate(ctx, &container.Config{ Entrypoint: []string{"cat"}, @@ -125,7 +127,7 @@ func (d *DockerDriver) CopyToolbox(ctx context.Context) error { } // ignore remove error - d.client.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{Force: true}) + _ = d.client.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{Force: true}) return nil } @@ -389,17 +391,6 @@ func (d *DockerDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) { return pods, nil } -func podLabelsFromContainer(containerLabels map[string]string) map[string]string { - labels := map[string]string{} - // keep only labels starting with our prefix - for k, v := range containerLabels { - if strings.HasPrefix(k, labelPrefix) { - labels[k] = v - } - } - return labels -} - type DockerPod struct { id string client *client.Client diff --git a/internal/services/executor/driver/docker_test.go b/internal/services/executor/driver/docker_test.go index d511141..b35e54c 100644 --- a/internal/services/executor/driver/docker_test.go +++ b/internal/services/executor/driver/docker_test.go @@ -38,7 +38,6 @@ import ( var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) var logger = slog.New(level) -var log = logger.Sugar() func parseEnv(envvar string) (string, string, error) { // trim white spaces at the start @@ -112,7 +111,7 @@ func TestDockerPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() }) t.Run("execute a command inside a pod", func(t *testing.T) { @@ -130,7 +129,7 @@ func TestDockerPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() ce, err := pod.Exec(ctx, &ExecConfig{ Cmd: []string{"ls"}, @@ -169,7 +168,7 @@ func TestDockerPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() var buf bytes.Buffer ce, err := pod.Exec(ctx, &ExecConfig{ @@ -223,7 +222,7 @@ func TestDockerPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() }) t.Run("test communication between two containers", func(t *testing.T) { @@ -244,7 +243,7 @@ func TestDockerPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() // wait for nginx up time.Sleep(1 * time.Second) @@ -280,7 +279,7 @@ func TestDockerPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() pods, err := d.GetPods(ctx, true) if err != nil { @@ -326,7 +325,7 @@ func TestDockerPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() pods, err := d.GetPods(ctx, true) if err != nil { @@ -372,7 +371,7 @@ func TestDockerPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() // delete the first container dp := pod.(*DockerPod) diff --git a/internal/services/executor/driver/k8s.go b/internal/services/executor/driver/k8s.go index 0b4c167..2caa578 100644 --- a/internal/services/executor/driver/k8s.go +++ b/internal/services/executor/driver/k8s.go @@ -27,7 +27,6 @@ import ( "agola.io/agola/internal/common" "agola.io/agola/internal/util" - "github.com/docker/docker/client" "github.com/docker/docker/pkg/archive" uuid "github.com/satori/go.uuid" "go.uber.org/zap" @@ -58,7 +57,6 @@ const ( executorsGroupIDKey = labelPrefix + "executorsgroupid" executorsGroupIDConfigMapKey = "executorsgroupid" - useLeaseAPIKey = labelPrefix + "useleaseapi" cmLeaseKey = labelPrefix + "lease" renewExecutorLeaseInterval = 10 * time.Second @@ -279,8 +277,7 @@ func (d *K8sDriver) getOrCreateExecutorsGroupID(ctx context.Context) (string, er }, Data: map[string]string{executorsGroupIDConfigMapKey: executorsGroupID}, } - cm, err = cmClient.Create(cm) - if err != nil { + if _, err = cmClient.Create(cm); err != nil { return "", err } @@ -322,7 +319,7 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri Type: corev1.SecretTypeDockerConfigJson, } - secret, err = secretClient.Create(secret) + _, err = secretClient.Create(secret) if err != nil { return nil, err } @@ -647,9 +644,7 @@ func (p *K8sPod) Remove(ctx context.Context) error { } type K8sContainerExec struct { - execID string - client *client.Client - endCh chan error + endCh chan error stdin io.WriteCloser } diff --git a/internal/services/executor/driver/k8s_test.go b/internal/services/executor/driver/k8s_test.go index ae568cc..4073fa2 100644 --- a/internal/services/executor/driver/k8s_test.go +++ b/internal/services/executor/driver/k8s_test.go @@ -62,7 +62,7 @@ func TestK8sPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() }) t.Run("execute a command inside a pod", func(t *testing.T) { @@ -80,7 +80,7 @@ func TestK8sPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() var buf bytes.Buffer ce, err := pod.Exec(ctx, &ExecConfig{ @@ -121,7 +121,7 @@ func TestK8sPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() var buf bytes.Buffer ce, err := pod.Exec(ctx, &ExecConfig{ @@ -175,7 +175,7 @@ func TestK8sPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() }) t.Run("test communication between two containers", func(t *testing.T) { @@ -196,7 +196,7 @@ func TestK8sPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() // wait for nginx up time.Sleep(1 * time.Second) @@ -234,7 +234,7 @@ func TestK8sPod(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - defer pod.Remove(ctx) + defer func() { _ = pod.Remove(ctx) }() pods, err := d.GetPods(ctx, true) if err != nil { diff --git a/internal/services/executor/driver/k8slease.go b/internal/services/executor/driver/k8slease.go index 2da5d03..ed728a6 100644 --- a/internal/services/executor/driver/k8slease.go +++ b/internal/services/executor/driver/k8slease.go @@ -45,7 +45,7 @@ func (d *K8sDriver) updateLease(ctx context.Context) error { if d.useLeaseAPI { duration := int32(duration) - now := metav1.MicroTime{now} + now := metav1.MicroTime{Time: now} leaseClient := d.client.CoordinationV1().Leases(d.namespace) found := false @@ -76,7 +76,7 @@ func (d *K8sDriver) updateLease(ctx context.Context) error { RenewTime: &now, }, } - lease, err = leaseClient.Create(lease) + _, err = leaseClient.Create(lease) return err } else { cmClient := d.client.CoreV1().ConfigMaps(d.namespace) @@ -128,10 +128,9 @@ func (d *K8sDriver) updateLease(ctx context.Context) error { }, } cm.Annotations[cmLeaseKey] = string(ldj) - cm, err = cmClient.Create(cm) + _, err = cmClient.Create(cm) return err } - return nil } func (d *K8sDriver) getLeases(ctx context.Context) ([]string, error) { diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index e914f1e..71eda73 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -84,8 +84,7 @@ func (e *Executor) createFile(ctx context.Context, pod driver.Pod, command, user stdin := ce.Stdin() go func() { - io.WriteString(stdin, command) - io.WriteString(stdin, "\n") + _, _ = io.WriteString(stdin, command+"\n") stdin.Close() }() @@ -157,7 +156,7 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe workingDir, err = e.expandDir(ctx, t, pod, outf, workingDir) if err != nil { - outf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", workingDir, err)) + _, _ = outf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", workingDir, err)) return -1, err } @@ -208,7 +207,7 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) if err != nil { - logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) return -1, err } @@ -254,7 +253,7 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor enc := json.NewEncoder(stdin) go func() { - enc.Encode(a) + _ = enc.Encode(a) stdin.Close() }() @@ -333,7 +332,7 @@ func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driv workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) if err != nil { - io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) return "", err } @@ -353,7 +352,7 @@ func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driv stdin := ce.Stdin() go func() { - io.WriteString(stdin, key) + _, _ = io.WriteString(stdin, key) stdin.Close() }() @@ -380,7 +379,7 @@ func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) if err != nil { - io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) return err } @@ -400,7 +399,7 @@ func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source stdin := ce.Stdin() go func() { - io.Copy(stdin, source) + _, _ = io.Copy(stdin, source) stdin.Close() }() @@ -498,7 +497,7 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir) if err != nil { - io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) + _, _ = io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err)) return -1, err } @@ -544,7 +543,7 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, enc := json.NewEncoder(stdin) go func() { - enc.Encode(a) + _ = enc.Encode(a) stdin.Close() }() @@ -848,13 +847,13 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { // error out if privileged containers are required but not allowed requiresPrivilegedContainers := false for _, c := range et.Containers { - if c.Privileged == true { + if c.Privileged { requiresPrivilegedContainers = true break } } - if requiresPrivilegedContainers == true && e.c.AllowPrivilegedContainers == false { - outf.WriteString("Executor doesn't allow executing privileged containers.\n") + if requiresPrivilegedContainers && !e.c.AllowPrivilegedContainers { + _, _ = outf.WriteString("Executor doesn't allow executing privileged containers.\n") return errors.Errorf("executor doesn't allow executing privileged containers") } @@ -893,18 +892,18 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { } } - outf.WriteString("Starting pod.\n") + _, _ = outf.WriteString("Starting pod.\n") pod, err := e.driver.NewPod(ctx, podConfig, outf) if err != nil { - outf.WriteString(fmt.Sprintf("Pod failed to start. Error: %s\n", err)) + _, _ = outf.WriteString(fmt.Sprintf("Pod failed to start. Error: %s\n", err)) return err } - outf.WriteString("Pod started.\n") + _, _ = outf.WriteString("Pod started.\n") if et.WorkingDir != "" { - outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.WorkingDir)) + _, _ = outf.WriteString(fmt.Sprintf("Creating working dir %q.\n", et.WorkingDir)) if err := e.mkdir(ctx, et, pod, outf, et.WorkingDir); err != nil { - outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.WorkingDir, err)) + _, _ = outf.WriteString(fmt.Sprintf("Failed to create working dir %q. Error: %s\n", et.WorkingDir, err)) return err } } @@ -1028,7 +1027,7 @@ func (e *Executor) podsCleaner(ctx context.Context) error { if pod.ExecutorID() == e.id { if _, ok := e.runningTasks.get(taskID); !ok { log.Infof("removing pod %s for not running task: %s", pod.ID(), taskID) - pod.Remove(ctx) + _ = pod.Remove(ctx) } } @@ -1042,7 +1041,7 @@ func (e *Executor) podsCleaner(ctx context.Context) error { } if !owned { log.Infof("removing pod %s since it's not owned by any active executor", pod.ID()) - pod.Remove(ctx) + _ = pod.Remove(ctx) } } @@ -1174,7 +1173,9 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { s.EndTime = util.TimePtr(time.Now()) } } - e.sendExecutorTaskStatus(ctx, et) + if err := e.sendExecutorTaskStatus(ctx, et); err != nil { + log.Errorf("err: %+v", err) + } } } } @@ -1262,12 +1263,6 @@ func (r *runningTasks) addIfNotExists(rtID string, rt *runningTask) bool { return true } -func (r *runningTasks) add(rtID string, rt *runningTask) { - r.m.Lock() - defer r.m.Unlock() - r.tasks[rtID] = rt -} - func (r *runningTasks) delete(rtID string) { r.m.Lock() defer r.m.Unlock()