From 4b67dd37911968f75a9bb5f590fcf758cdf41575 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 18 Oct 2020 01:59:38 -0700 Subject: [PATCH] simplify --- controllers/controler_util.go | 208 ++++++++++++++++++++++++++++++- controllers/controller_filer.go | 55 ++++---- controllers/controller_master.go | 22 +--- controllers/controller_volume.go | 67 +++------- go.mod | 1 + 5 files changed, 248 insertions(+), 105 deletions(-) diff --git a/controllers/controler_util.go b/controllers/controler_util.go index 28e4caf..0d43171 100644 --- a/controllers/controler_util.go +++ b/controllers/controler_util.go @@ -2,8 +2,11 @@ package controllers import ( "context" + "encoding/json" "fmt" seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -12,6 +15,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -54,12 +58,20 @@ func createIngress(seaweedCR *seaweedv1.Seaweed, svcName string, port int) *exte // the following is adapted from tidb-operator/pkg/controller/generic_control.go +const ( + // LastAppliedPodTemplate is annotation key of the last applied pod template + LastAppliedPodTemplate = "seaweedfs.com/last-applied-podtemplate" + + // LastAppliedConfigAnnotation is annotation key of last applied configuration + LastAppliedConfigAnnotation = "seaweedfs.com/last-applied-configuration" +) + type MergeFn func(existing, desired runtime.Object) error // CreateOrUpdate create an object to the Kubernetes cluster for controller, if the object to create is existed, // call mergeFn to merge the change in new object to the existing object, then update the existing object. // The object will also be adopted by the given controller. -func (r *SeaweedReconciler) CreateOrUpdate(controller, obj runtime.Object, mergeFn MergeFn) (runtime.Object, error) { +func (r *SeaweedReconciler) CreateOrUpdate(obj runtime.Object, mergeFn MergeFn) (runtime.Object, error) { // controller-runtime/client will mutate the object pointer in-place, // to be consistent with other methods in our controller, we copy the object @@ -102,6 +114,149 @@ func (r *SeaweedReconciler) CreateOrUpdate(controller, obj runtime.Object, merge return desired, err } +func (r *SeaweedReconciler) addSpecToAnnotation(d *appsv1.Deployment) error { + b, err := json.Marshal(d.Spec.Template.Spec) + if err != nil { + return err + } + if d.Annotations == nil { + d.Annotations = map[string]string{} + } + d.Annotations[LastAppliedPodTemplate] = string(b) + return nil +} + +func (r *SeaweedReconciler) CreateOrUpdateDeployment(deploy *appsv1.Deployment) (*appsv1.Deployment, error) { + r.addSpecToAnnotation(deploy) + result, err := r.CreateOrUpdate(deploy, func(existing, desired runtime.Object) error { + existingDep := existing.(*appsv1.Deployment) + desiredDep := desired.(*appsv1.Deployment) + + existingDep.Spec.Replicas = desiredDep.Spec.Replicas + existingDep.Labels = desiredDep.Labels + + if existingDep.Annotations == nil { + existingDep.Annotations = map[string]string{} + } + for k, v := range desiredDep.Annotations { + existingDep.Annotations[k] = v + } + // only override the default strategy if it is explicitly set in the desiredDep + if string(desiredDep.Spec.Strategy.Type) != "" { + existingDep.Spec.Strategy.Type = desiredDep.Spec.Strategy.Type + if existingDep.Spec.Strategy.RollingUpdate != nil { + existingDep.Spec.Strategy.RollingUpdate = desiredDep.Spec.Strategy.RollingUpdate + } + } + // pod selector of deployment is immutable, so we don't mutate the labels of pod + for k, v := range desiredDep.Spec.Template.Annotations { + existingDep.Spec.Template.Annotations[k] = v + } + // podSpec of deployment is hard to merge, use an annotation to assist + if DeploymentPodSpecChanged(desiredDep, existingDep) { + // Record last applied spec in favor of future equality check + b, err := json.Marshal(desiredDep.Spec.Template.Spec) + if err != nil { + return err + } + existingDep.Annotations[LastAppliedConfigAnnotation] = string(b) + existingDep.Spec.Template.Spec = desiredDep.Spec.Template.Spec + } + return nil + }) + if err != nil { + return nil, err + } + return result.(*appsv1.Deployment), err +} + +func (r *SeaweedReconciler) CreateOrUpdateService(svc *corev1.Service) (*corev1.Service, error) { + result, err := r.CreateOrUpdate(svc, func(existing, desired runtime.Object) error { + existingSvc := existing.(*corev1.Service) + desiredSvc := desired.(*corev1.Service) + + if existingSvc.Annotations == nil { + existingSvc.Annotations = map[string]string{} + } + for k, v := range desiredSvc.Annotations { + existingSvc.Annotations[k] = v + } + existingSvc.Labels = desiredSvc.Labels + equal, err := ServiceEqual(desiredSvc, existingSvc) + if err != nil { + return err + } + if !equal { + // record desiredSvc Spec in annotations in favor of future equality checks + b, err := json.Marshal(desiredSvc.Spec) + if err != nil { + return err + } + existingSvc.Annotations[LastAppliedConfigAnnotation] = string(b) + clusterIp := existingSvc.Spec.ClusterIP + ports := existingSvc.Spec.Ports + serviceType := existingSvc.Spec.Type + + existingSvc.Spec = desiredSvc.Spec + existingSvc.Spec.ClusterIP = clusterIp + + // If the existed service and the desired service is NodePort or LoadBalancerType, we should keep the nodePort unchanged. + if (serviceType == corev1.ServiceTypeNodePort || serviceType == corev1.ServiceTypeLoadBalancer) && + (desiredSvc.Spec.Type == corev1.ServiceTypeNodePort || desiredSvc.Spec.Type == corev1.ServiceTypeLoadBalancer) { + for i, dport := range existingSvc.Spec.Ports { + for _, eport := range ports { + // Because the portName could be edited, + // we use Port number to link the desired Service Port and the existed Service Port in the nested loop + if dport.Port == eport.Port && dport.Protocol == eport.Protocol { + dport.NodePort = eport.NodePort + existingSvc.Spec.Ports[i] = dport + break + } + } + } + } + } + return nil + }) + if err != nil { + return nil, err + } + return result.(*corev1.Service), nil +} + +func (r *SeaweedReconciler) CreateOrUpdateIngress(ingress *extensionsv1beta1.Ingress) (*extensionsv1beta1.Ingress, error) { + result, err := r.CreateOrUpdate(ingress, func(existing, desired runtime.Object) error { + existingIngress := existing.(*extensionsv1beta1.Ingress) + desiredIngress := desired.(*extensionsv1beta1.Ingress) + + if existingIngress.Annotations == nil { + existingIngress.Annotations = map[string]string{} + } + for k, v := range desiredIngress.Annotations { + existingIngress.Annotations[k] = v + } + existingIngress.Labels = desiredIngress.Labels + equal, err := IngressEqual(desiredIngress, existingIngress) + if err != nil { + return err + } + if !equal { + // record desiredIngress Spec in annotations in favor of future equality checks + b, err := json.Marshal(desiredIngress.Spec) + if err != nil { + return err + } + existingIngress.Annotations[LastAppliedConfigAnnotation] = string(b) + existingIngress.Spec = desiredIngress.Spec + } + return nil + }) + if err != nil { + return nil, err + } + return result.(*extensionsv1beta1.Ingress), nil +} + // EmptyClone create an clone of the resource with the same name and namespace (if namespace-scoped), with other fields unset func EmptyClone(obj runtime.Object) (runtime.Object, error) { meta, ok := obj.(metav1.Object) @@ -136,3 +291,54 @@ func InferObjectKind(obj runtime.Object) (schema.GroupVersionKind, error) { } return gvks[0], nil } + +// GetDeploymentLastAppliedPodTemplate set last applied pod template from Deployment's annotation +func GetDeploymentLastAppliedPodTemplate(dep *appsv1.Deployment) (*corev1.PodSpec, error) { + applied, ok := dep.Annotations[LastAppliedPodTemplate] + if !ok { + return nil, fmt.Errorf("deployment:[%s/%s] not found spec's apply config", dep.GetNamespace(), dep.GetName()) + } + podSpec := &corev1.PodSpec{} + err := json.Unmarshal([]byte(applied), podSpec) + if err != nil { + return nil, err + } + return podSpec, nil +} + +// DeploymentPodSpecChanged checks whether the new deployment differs with the old one's last-applied-config +func DeploymentPodSpecChanged(newDep *appsv1.Deployment, oldDep *appsv1.Deployment) bool { + lastAppliedPodTemplate, err := GetDeploymentLastAppliedPodTemplate(oldDep) + if err != nil { + klog.Warningf("error get last-applied-config of deployment %s/%s: %v", oldDep.Namespace, oldDep.Name, err) + return true + } + return !apiequality.Semantic.DeepEqual(newDep.Spec.Template.Spec, lastAppliedPodTemplate) +} + +// ServiceEqual compares the new Service's spec with old Service's last applied config +func ServiceEqual(newSvc, oldSvc *corev1.Service) (bool, error) { + oldSpec := corev1.ServiceSpec{} + if lastAppliedConfig, ok := oldSvc.Annotations[LastAppliedConfigAnnotation]; ok { + err := json.Unmarshal([]byte(lastAppliedConfig), &oldSpec) + if err != nil { + klog.Errorf("unmarshal ServiceSpec: [%s/%s]'s applied config failed,error: %v", oldSvc.GetNamespace(), oldSvc.GetName(), err) + return false, err + } + return apiequality.Semantic.DeepEqual(oldSpec, newSvc.Spec), nil + } + return false, nil +} + +func IngressEqual(newIngress, oldIngres *extensionsv1beta1.Ingress) (bool, error) { + oldIngressSpec := extensionsv1beta1.IngressSpec{} + if lastAppliedConfig, ok := oldIngres.Annotations[LastAppliedConfigAnnotation]; ok { + err := json.Unmarshal([]byte(lastAppliedConfig), &oldIngressSpec) + if err != nil { + klog.Errorf("unmarshal IngressSpec: [%s/%s]'s applied config failed,error: %v", oldIngres.GetNamespace(), oldIngres.GetName(), err) + return false, err + } + return apiequality.Semantic.DeepEqual(oldIngressSpec, newIngress.Spec), nil + } + return false, nil +} diff --git a/controllers/controller_filer.go b/controllers/controller_filer.go index 0492d51..a4b56f6 100644 --- a/controllers/controller_filer.go +++ b/controllers/controller_filer.go @@ -5,9 +5,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" @@ -33,8 +30,10 @@ func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (do } func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { + log := r.Log.WithValues("sw-filer-statefulset", seaweedCR.Name) + filerStatefulSet := r.createFilerStatefulSet(seaweedCR) - _, err := r.CreateOrUpdate(seaweedCR, filerStatefulSet, func(existing, desired runtime.Object) error { + _, err := r.CreateOrUpdate(filerStatefulSet, func(existing, desired runtime.Object) error { existingStatefulSet := existing.(*appsv1.StatefulSet) desiredStatefulSet := desired.(*appsv1.StatefulSet) @@ -42,44 +41,34 @@ func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed) existingStatefulSet.Spec.Template.Spec.Containers[0].Image = desiredStatefulSet.Spec.Template.Spec.Containers[0].Image return nil }) + log.Info("ensure filer stateful set " + filerStatefulSet.Name) return ReconcileResult(err) } func (r *SeaweedReconciler) ensureFilerHeadlessService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { - return r.ensureService(seaweedCR, "filer-headless", r.createFilerHeadlessService) + + log := r.Log.WithValues("sw-filer-headless-service", seaweedCR.Name) + + filerHeadlessService := r.createFilerHeadlessService(seaweedCR) + _, err := r.CreateOrUpdateService(filerHeadlessService) + + log.Info("ensure filer headless service " + filerHeadlessService.Name) + + return ReconcileResult(err) } func (r *SeaweedReconciler) ensureFilerNodePortService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { - return r.ensureService(seaweedCR, "filer", r.createFilerNodePortService) + + log := r.Log.WithValues("sw-filer-service", seaweedCR.Name) + + filerService := r.createFilerNodePortService(seaweedCR) + _, err := r.CreateOrUpdateService(filerService) + + log.Info("ensure filer service " + filerService.Name) + + return ReconcileResult(err) } func labelsForFiler(name string) map[string]string { return map[string]string{"app": "seaweedfs", "role": "filer", "name": name} } - -type CreateServiceFunc func(m *seaweedv1.Seaweed) *corev1.Service - -func (r *SeaweedReconciler) ensureService(seaweedCR *seaweedv1.Seaweed, nameSuffix string, serviceFunc CreateServiceFunc) (bool, ctrl.Result, error) { - ctx := context.Background() - log := r.Log.WithValues("sw", seaweedCR.Name, "service", nameSuffix) - - aService := &corev1.Service{} - err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-" + nameSuffix, Namespace: seaweedCR.Namespace}, aService) - if err != nil && errors.IsNotFound(err) { - // Define a new deployment - dep := serviceFunc(seaweedCR) - log.Info("Creating a new service", "Namespace", dep.Namespace, "Name", dep.Name) - err = r.Create(ctx, dep) - if err != nil { - log.Error(err, "Failed to create service", "Namespace", dep.Namespace, "Name", dep.Name) - return ReconcileResult(err) - } - // Deployment created successfully - return and requeue - return ReconcileResult(err) - } else if err != nil { - log.Error(err, "Failed to get server service") - return ReconcileResult(err) - } - log.Info("Get service " + aService.Name) - return ReconcileResult(err) -} diff --git a/controllers/controller_master.go b/controllers/controller_master.go index 08fcef2..8452737 100644 --- a/controllers/controller_master.go +++ b/controllers/controller_master.go @@ -5,7 +5,6 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -72,26 +71,11 @@ func (r *SeaweedReconciler) ensureMasterStatefulSet(seaweedCR *seaweedv1.Seaweed } func (r *SeaweedReconciler) ensureMasterService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { - ctx := context.Background() log := r.Log.WithValues("sw-master-service", seaweedCR.Name) - masterService := &corev1.Service{} - err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-master", Namespace: seaweedCR.Namespace}, masterService) - if err != nil && errors.IsNotFound(err) { - // Define a new deployment - dep := r.createMasterService(seaweedCR) - log.Info("Creating a new master service", "Namespace", dep.Namespace, "Name", dep.Name) - err = r.Create(ctx, dep) - if err != nil { - log.Error(err, "Failed to create master service", "Namespace", dep.Namespace, "Name", dep.Name) - return ReconcileResult(err) - } - // Deployment created successfully - return and requeue - return ReconcileResult(err) - } else if err != nil { - log.Error(err, "Failed to get master service", "Namespace", seaweedCR.Namespace, "Name", seaweedCR.Name+"-master") - return ReconcileResult(err) - } + masterService := r.createMasterService(seaweedCR) + _, err := r.CreateOrUpdateService(masterService) + log.Info("Get master service " + masterService.Name) return ReconcileResult(err) diff --git a/controllers/controller_volume.go b/controllers/controller_volume.go index dbd7709..cd77c6e 100644 --- a/controllers/controller_volume.go +++ b/controllers/controller_volume.go @@ -2,11 +2,9 @@ package controllers import ( "context" + "k8s.io/apimachinery/pkg/runtime" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" @@ -28,65 +26,30 @@ func (r *SeaweedReconciler) ensureVolumeServers(seaweedCR *seaweedv1.Seaweed) (d } func (r *SeaweedReconciler) ensureVolumeServerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { - ctx := context.Background() log := r.Log.WithValues("sw-volume-statefulset", seaweedCR.Name) - volumeServerStatefulSet := &appsv1.StatefulSet{} - err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerStatefulSet) - if err != nil && errors.IsNotFound(err) { - // Define a new deployment - dep := r.createVolumeServerStatefulSet(seaweedCR) - log.Info("Creating a new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name) - err = r.Create(ctx, dep) - if err != nil { - log.Error(err, "Failed to create new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name) - return ReconcileResult(err) - } - // Deployment created successfully - return and requeue - return ReconcileResult(err) - } else if err != nil { - log.Error(err, "Failed to get volume server statefulset") - return ReconcileResult(err) - } + volumeServerStatefulSet := r.createVolumeServerStatefulSet(seaweedCR) + _, err := r.CreateOrUpdate(volumeServerStatefulSet, func(existing, desired runtime.Object) error { + existingStatefulSet := existing.(*appsv1.StatefulSet) + desiredStatefulSet := desired.(*appsv1.StatefulSet) - if *volumeServerStatefulSet.Spec.Replicas != seaweedCR.Spec.VolumeServerCount || - volumeServerStatefulSet.Spec.Template.Spec.Containers[0].Image != seaweedCR.Spec.Image { - volumeServerStatefulSet.Spec.Replicas = &seaweedCR.Spec.VolumeServerCount - volumeServerStatefulSet.Spec.Template.Spec.Containers[0].Image = seaweedCR.Spec.Image - if err = r.Update(ctx, volumeServerStatefulSet); err != nil { - log.Error(err, "Failed to update volume statefulset", "Namespace", volumeServerStatefulSet.Namespace, "Name", volumeServerStatefulSet.Name) - return ReconcileResult(err) - } - // Deployment created successfully - return and requeue - return ReconcileResult(err) - } + existingStatefulSet.Spec.Replicas = desiredStatefulSet.Spec.Replicas + existingStatefulSet.Spec.Template.Spec.Containers[0].Image = desiredStatefulSet.Spec.Template.Spec.Containers[0].Image + return nil + }) - log.Info("Get volume stateful set " + volumeServerStatefulSet.Name) + log.Info("ensure volume stateful set " + volumeServerStatefulSet.Name) return ReconcileResult(err) } func (r *SeaweedReconciler) ensureVolumeServerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { - ctx := context.Background() + log := r.Log.WithValues("sw-volume-service", seaweedCR.Name) - volumeServerService := &corev1.Service{} - err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerService) - if err != nil && errors.IsNotFound(err) { - // Define a new deployment - dep := r.createVolumeServerService(seaweedCR) - log.Info("Creating a new volume service", "Namespace", dep.Namespace, "Name", dep.Name) - err = r.Create(ctx, dep) - if err != nil { - log.Error(err, "Failed to create new volume service", "Namespace", dep.Namespace, "Name", dep.Name) - return ReconcileResult(err) - } - // Deployment created successfully - return and requeue - return ReconcileResult(err) - } else if err != nil { - log.Error(err, "Failed to get volume server service") - return ReconcileResult(err) - } - log.Info("Get volume service " + volumeServerService.Name) + volumeServerService := r.createVolumeServerService(seaweedCR) + _, err := r.CreateOrUpdateService(volumeServerService) + + log.Info("ensure volume service " + volumeServerService.Name) return ReconcileResult(err) } diff --git a/go.mod b/go.mod index 3617aff..7cad228 100644 --- a/go.mod +++ b/go.mod @@ -9,5 +9,6 @@ require ( k8s.io/api v0.18.2 k8s.io/apimachinery v0.18.2 k8s.io/client-go v0.18.2 + k8s.io/klog v1.0.0 sigs.k8s.io/controller-runtime v0.6.0 )