runservice executor: use k8s client informers/listers

Use k8s client informers/listers instead of polling the api every time
This commit is contained in:
Simone Gotti 2019-04-26 10:15:23 +02:00
parent 8989bd0e8e
commit 9c7c589bba
4 changed files with 46 additions and 17 deletions

1
go.mod
View File

@ -30,6 +30,7 @@ require (
github.com/gorilla/handlers v1.4.0
github.com/gorilla/mux v1.7.0
github.com/hashicorp/go-sockaddr v1.0.1
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/json-iterator/go v1.1.6 // indirect
github.com/jtolds/gls v4.2.1+incompatible // indirect

2
go.sum
View File

@ -100,6 +100,8 @@ github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brv
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-sockaddr v1.0.1 h1:eCkkJ5KOOktDvwbsE9KPyiBWaOfp1ZNy2gLHgL8PSBM=
github.com/hashicorp/go-sockaddr v1.0.1/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI=

View File

@ -36,9 +36,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apilabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
coordinationlistersv1 "k8s.io/client-go/listers/coordination/v1"
listerscorev1 "k8s.io/client-go/listers/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
@ -59,6 +62,7 @@ const (
renewExecutorLeaseInterval = 10 * time.Second
staleExecutorLeaseInterval = 1 * time.Minute
informerResyncInterval = 10 * time.Second
)
type K8sDriver struct {
@ -70,6 +74,10 @@ type K8sDriver struct {
executorID string
executorsGroupID string
useLeaseAPI bool
nodeLister listerscorev1.NodeLister
podLister listerscorev1.PodLister
cmLister listerscorev1.ConfigMapLister
leaseLister coordinationlistersv1.LeaseLister
}
type K8sPod struct {
@ -168,6 +176,26 @@ func NewK8sDriver(logger *zap.Logger, executorID, toolboxPath string) (*K8sDrive
}
}()
factory := informers.NewSharedInformerFactoryWithOptions(d.client, informerResyncInterval, informers.WithNamespace(d.namespace))
nodeInformer := factory.Core().V1().Nodes()
d.nodeLister = nodeInformer.Lister()
go nodeInformer.Informer().Run(ctx.Done())
podInformer := factory.Core().V1().Pods()
d.podLister = podInformer.Lister()
go podInformer.Informer().Run(ctx.Done())
if d.useLeaseAPI {
leaseInformer := factory.Coordination().V1().Leases()
d.leaseLister = leaseInformer.Lister()
go leaseInformer.Informer().Run(ctx.Done())
} else {
cmInformer := factory.Core().V1().ConfigMaps()
d.cmLister = cmInformer.Lister()
go cmInformer.Informer().Run(ctx.Done())
}
return d, nil
}
@ -201,14 +229,13 @@ func (d *K8sDriver) Setup(ctx context.Context) error {
func (d *K8sDriver) Archs(ctx context.Context) ([]common.Arch, error) {
// TODO(sgotti) use go client listers instead of querying every time
nodeClient := d.client.CoreV1().Nodes()
nodes, err := nodeClient.List(metav1.ListOptions{})
nodes, err := d.nodeLister.List(apilabels.SelectorFromSet(nil))
if err != nil {
return nil, err
}
archsMap := map[common.Arch]struct{}{}
archs := []common.Arch{}
for _, node := range nodes.Items {
for _, node := range nodes {
archsMap[common.ArchFromString(node.Status.NodeInfo.Architecture)] = struct{}{}
}
for arch := range archsMap {
@ -502,19 +529,16 @@ func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Wri
}
func (d *K8sDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) {
podClient := d.client.CoreV1().Pods(d.namespace)
// get all pods for the executor group, also the ones managed by other executors in the same executor group
labels := map[string]string{executorsGroupIDKey: d.executorsGroupID}
// TODO(sgotti) use go client listers instead of querying every time
k8sPods, err := podClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()})
k8sPods, err := d.podLister.List(apilabels.SelectorFromSet(labels))
if err != nil {
return nil, err
}
pods := make([]Pod, len(k8sPods.Items))
for i, k8sPod := range k8sPods.Items {
pods := make([]Pod, len(k8sPods))
for i, k8sPod := range k8sPods {
labels := map[string]string{}
// keep only labels starting with our prefix
for n, v := range k8sPod.Labels {

View File

@ -140,7 +140,6 @@ func (d *K8sDriver) getLeases(ctx context.Context) ([]string, error) {
labels := map[string]string{}
labels[executorsGroupIDKey] = d.executorsGroupID
// TODO(sgotti) use go client listers instead of querying every time
if d.useLeaseAPI {
leaseClient := d.client.CoordinationV1().Leases(d.namespace)
@ -174,15 +173,14 @@ func (d *K8sDriver) cleanStaleExecutorsLease(ctx context.Context) error {
labels := map[string]string{}
labels[executorsGroupIDKey] = d.executorsGroupID
// TODO(sgotti) use go client listers instead of querying every time
if d.useLeaseAPI {
leaseClient := d.client.CoordinationV1().Leases(d.namespace)
leases, err := leaseClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()})
leases, err := d.leaseLister.List(apilabels.SelectorFromSet(labels))
if err != nil {
return err
}
for _, lease := range leases.Items {
for _, lease := range leases {
if lease.Spec.HolderIdentity == nil {
d.log.Warnf("missing holder identity for lease %q", lease.Name)
continue
@ -197,17 +195,19 @@ func (d *K8sDriver) cleanStaleExecutorsLease(ctx context.Context) error {
}
if lease.Spec.RenewTime.Add(staleExecutorLeaseInterval).Before(time.Now()) {
d.log.Infof("deleting stale lease %q", lease.Name)
leaseClient.Delete(lease.Name, nil)
if err := leaseClient.Delete(lease.Name, nil); err != nil {
d.log.Errorf("failed to delete stale lease %q, err: %v", lease.Name, err)
}
}
}
} else {
cmClient := d.client.CoreV1().ConfigMaps(d.namespace)
cms, err := cmClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()})
cms, err := d.cmLister.List(apilabels.SelectorFromSet(labels))
if err != nil {
return err
}
for _, cm := range cms.Items {
for _, cm := range cms {
var ld *LeaseData
if cm.Annotations == nil {
// this shouldn't happen
@ -225,7 +225,9 @@ func (d *K8sDriver) cleanStaleExecutorsLease(ctx context.Context) error {
}
if ld.RenewTime.Add(staleExecutorLeaseInterval).Before(time.Now()) {
d.log.Infof("deleting stale configmap lease %q", cm.Name)
cmClient.Delete(cm.Name, nil)
if err := cmClient.Delete(cm.Name, nil); err != nil {
d.log.Errorf("failed to delete stale configmap lease %q, err: %v", cm.Name, err)
}
}
}
}