From 9c7c589bbab29f9483f55feffa05feae6f9eff42 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 26 Apr 2019 10:15:23 +0200 Subject: [PATCH] runservice executor: use k8s client informers/listers Use k8s client informers/listers instead of polling the api every time --- go.mod | 1 + go.sum | 2 + .../runservice/executor/driver/k8s.go | 42 +++++++++++++++---- .../runservice/executor/driver/k8slease.go | 18 ++++---- 4 files changed, 46 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 1905538..dfee576 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/gorilla/handlers v1.4.0 github.com/gorilla/mux v1.7.0 github.com/hashicorp/go-sockaddr v1.0.1 + github.com/hashicorp/golang-lru v0.5.1 // indirect github.com/imdario/mergo v0.3.7 // indirect github.com/json-iterator/go v1.1.6 // indirect github.com/jtolds/gls v4.2.1+incompatible // indirect diff --git a/go.sum b/go.sum index ffc0d06..acdcf5c 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,8 @@ github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brv github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-sockaddr v1.0.1 h1:eCkkJ5KOOktDvwbsE9KPyiBWaOfp1ZNy2gLHgL8PSBM= github.com/hashicorp/go-sockaddr v1.0.1/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A= +github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= +github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= diff --git a/internal/services/runservice/executor/driver/k8s.go b/internal/services/runservice/executor/driver/k8s.go index 14e2449..fb91c6d 100644 --- a/internal/services/runservice/executor/driver/k8s.go +++ b/internal/services/runservice/executor/driver/k8s.go @@ -36,9 +36,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apilabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + coordinationlistersv1 "k8s.io/client-go/listers/coordination/v1" + listerscorev1 "k8s.io/client-go/listers/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/remotecommand" @@ -59,6 +62,7 @@ const ( renewExecutorLeaseInterval = 10 * time.Second staleExecutorLeaseInterval = 1 * time.Minute + informerResyncInterval = 10 * time.Second ) type K8sDriver struct { @@ -70,6 +74,10 @@ type K8sDriver struct { executorID string executorsGroupID string useLeaseAPI bool + nodeLister listerscorev1.NodeLister + podLister listerscorev1.PodLister + cmLister listerscorev1.ConfigMapLister + leaseLister coordinationlistersv1.LeaseLister } type K8sPod struct { @@ -168,6 +176,26 @@ func NewK8sDriver(logger *zap.Logger, executorID, toolboxPath string) (*K8sDrive } }() + factory := informers.NewSharedInformerFactoryWithOptions(d.client, informerResyncInterval, informers.WithNamespace(d.namespace)) + + nodeInformer := factory.Core().V1().Nodes() + d.nodeLister = nodeInformer.Lister() + go nodeInformer.Informer().Run(ctx.Done()) + + podInformer := factory.Core().V1().Pods() + d.podLister = podInformer.Lister() + go podInformer.Informer().Run(ctx.Done()) + + if d.useLeaseAPI { + leaseInformer := factory.Coordination().V1().Leases() + d.leaseLister = leaseInformer.Lister() + go leaseInformer.Informer().Run(ctx.Done()) + } else { + cmInformer := factory.Core().V1().ConfigMaps() + d.cmLister = cmInformer.Lister() + go cmInformer.Informer().Run(ctx.Done()) + } + return d, nil } @@ -201,14 +229,13 @@ func (d *K8sDriver) Setup(ctx context.Context) error { func (d *K8sDriver) Archs(ctx context.Context) ([]common.Arch, error) { // TODO(sgotti) use go client listers instead of querying every time - nodeClient := d.client.CoreV1().Nodes() - nodes, err := nodeClient.List(metav1.ListOptions{}) + nodes, err := d.nodeLister.List(apilabels.SelectorFromSet(nil)) if err != nil { return nil, err } archsMap := map[common.Arch]struct{}{} archs := []common.Arch{} - for _, node := range nodes.Items { + for _, node := range nodes { archsMap[common.ArchFromString(node.Status.NodeInfo.Architecture)] = struct{}{} } for arch := range archsMap { @@ -502,19 +529,16 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri } func (d *K8sDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) { - podClient := d.client.CoreV1().Pods(d.namespace) - // get all pods for the executor group, also the ones managed by other executors in the same executor group labels := map[string]string{executorsGroupIDKey: d.executorsGroupID} - // TODO(sgotti) use go client listers instead of querying every time - k8sPods, err := podClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) + k8sPods, err := d.podLister.List(apilabels.SelectorFromSet(labels)) if err != nil { return nil, err } - pods := make([]Pod, len(k8sPods.Items)) - for i, k8sPod := range k8sPods.Items { + pods := make([]Pod, len(k8sPods)) + for i, k8sPod := range k8sPods { labels := map[string]string{} // keep only labels starting with our prefix for n, v := range k8sPod.Labels { diff --git a/internal/services/runservice/executor/driver/k8slease.go b/internal/services/runservice/executor/driver/k8slease.go index 4302d80..69874f7 100644 --- a/internal/services/runservice/executor/driver/k8slease.go +++ b/internal/services/runservice/executor/driver/k8slease.go @@ -140,7 +140,6 @@ func (d *K8sDriver) getLeases(ctx context.Context) ([]string, error) { labels := map[string]string{} labels[executorsGroupIDKey] = d.executorsGroupID - // TODO(sgotti) use go client listers instead of querying every time if d.useLeaseAPI { leaseClient := d.client.CoordinationV1().Leases(d.namespace) @@ -174,15 +173,14 @@ func (d *K8sDriver) cleanStaleExecutorsLease(ctx context.Context) error { labels := map[string]string{} labels[executorsGroupIDKey] = d.executorsGroupID - // TODO(sgotti) use go client listers instead of querying every time if d.useLeaseAPI { leaseClient := d.client.CoordinationV1().Leases(d.namespace) - leases, err := leaseClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) + leases, err := d.leaseLister.List(apilabels.SelectorFromSet(labels)) if err != nil { return err } - for _, lease := range leases.Items { + for _, lease := range leases { if lease.Spec.HolderIdentity == nil { d.log.Warnf("missing holder identity for lease %q", lease.Name) continue @@ -197,17 +195,19 @@ func (d *K8sDriver) cleanStaleExecutorsLease(ctx context.Context) error { } if lease.Spec.RenewTime.Add(staleExecutorLeaseInterval).Before(time.Now()) { d.log.Infof("deleting stale lease %q", lease.Name) - leaseClient.Delete(lease.Name, nil) + if err := leaseClient.Delete(lease.Name, nil); err != nil { + d.log.Errorf("failed to delete stale lease %q, err: %v", lease.Name, err) + } } } } else { cmClient := d.client.CoreV1().ConfigMaps(d.namespace) - cms, err := cmClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) + cms, err := d.cmLister.List(apilabels.SelectorFromSet(labels)) if err != nil { return err } - for _, cm := range cms.Items { + for _, cm := range cms { var ld *LeaseData if cm.Annotations == nil { // this shouldn't happen @@ -225,7 +225,9 @@ func (d *K8sDriver) cleanStaleExecutorsLease(ctx context.Context) error { } if ld.RenewTime.Add(staleExecutorLeaseInterval).Before(time.Now()) { d.log.Infof("deleting stale configmap lease %q", cm.Name) - cmClient.Delete(cm.Name, nil) + if err := cmClient.Delete(cm.Name, nil); err != nil { + d.log.Errorf("failed to delete stale configmap lease %q, err: %v", cm.Name, err) + } } } }