simplify
This commit is contained in:
parent
fb816896d7
commit
4b67dd3791
|
@ -2,8 +2,11 @@ package controllers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||||
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
|
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
@ -12,6 +15,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
|
"k8s.io/klog"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -54,12 +58,20 @@ func createIngress(seaweedCR *seaweedv1.Seaweed, svcName string, port int) *exte
|
||||||
|
|
||||||
// the following is adapted from tidb-operator/pkg/controller/generic_control.go
|
// the following is adapted from tidb-operator/pkg/controller/generic_control.go
|
||||||
|
|
||||||
|
const (
|
||||||
|
// LastAppliedPodTemplate is annotation key of the last applied pod template
|
||||||
|
LastAppliedPodTemplate = "seaweedfs.com/last-applied-podtemplate"
|
||||||
|
|
||||||
|
// LastAppliedConfigAnnotation is annotation key of last applied configuration
|
||||||
|
LastAppliedConfigAnnotation = "seaweedfs.com/last-applied-configuration"
|
||||||
|
)
|
||||||
|
|
||||||
type MergeFn func(existing, desired runtime.Object) error
|
type MergeFn func(existing, desired runtime.Object) error
|
||||||
|
|
||||||
// CreateOrUpdate create an object to the Kubernetes cluster for controller, if the object to create is existed,
|
// CreateOrUpdate create an object to the Kubernetes cluster for controller, if the object to create is existed,
|
||||||
// call mergeFn to merge the change in new object to the existing object, then update the existing object.
|
// call mergeFn to merge the change in new object to the existing object, then update the existing object.
|
||||||
// The object will also be adopted by the given controller.
|
// The object will also be adopted by the given controller.
|
||||||
func (r *SeaweedReconciler) CreateOrUpdate(controller, obj runtime.Object, mergeFn MergeFn) (runtime.Object, error) {
|
func (r *SeaweedReconciler) CreateOrUpdate(obj runtime.Object, mergeFn MergeFn) (runtime.Object, error) {
|
||||||
|
|
||||||
// controller-runtime/client will mutate the object pointer in-place,
|
// controller-runtime/client will mutate the object pointer in-place,
|
||||||
// to be consistent with other methods in our controller, we copy the object
|
// to be consistent with other methods in our controller, we copy the object
|
||||||
|
@ -102,6 +114,149 @@ func (r *SeaweedReconciler) CreateOrUpdate(controller, obj runtime.Object, merge
|
||||||
return desired, err
|
return desired, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *SeaweedReconciler) addSpecToAnnotation(d *appsv1.Deployment) error {
|
||||||
|
b, err := json.Marshal(d.Spec.Template.Spec)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if d.Annotations == nil {
|
||||||
|
d.Annotations = map[string]string{}
|
||||||
|
}
|
||||||
|
d.Annotations[LastAppliedPodTemplate] = string(b)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *SeaweedReconciler) CreateOrUpdateDeployment(deploy *appsv1.Deployment) (*appsv1.Deployment, error) {
|
||||||
|
r.addSpecToAnnotation(deploy)
|
||||||
|
result, err := r.CreateOrUpdate(deploy, func(existing, desired runtime.Object) error {
|
||||||
|
existingDep := existing.(*appsv1.Deployment)
|
||||||
|
desiredDep := desired.(*appsv1.Deployment)
|
||||||
|
|
||||||
|
existingDep.Spec.Replicas = desiredDep.Spec.Replicas
|
||||||
|
existingDep.Labels = desiredDep.Labels
|
||||||
|
|
||||||
|
if existingDep.Annotations == nil {
|
||||||
|
existingDep.Annotations = map[string]string{}
|
||||||
|
}
|
||||||
|
for k, v := range desiredDep.Annotations {
|
||||||
|
existingDep.Annotations[k] = v
|
||||||
|
}
|
||||||
|
// only override the default strategy if it is explicitly set in the desiredDep
|
||||||
|
if string(desiredDep.Spec.Strategy.Type) != "" {
|
||||||
|
existingDep.Spec.Strategy.Type = desiredDep.Spec.Strategy.Type
|
||||||
|
if existingDep.Spec.Strategy.RollingUpdate != nil {
|
||||||
|
existingDep.Spec.Strategy.RollingUpdate = desiredDep.Spec.Strategy.RollingUpdate
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// pod selector of deployment is immutable, so we don't mutate the labels of pod
|
||||||
|
for k, v := range desiredDep.Spec.Template.Annotations {
|
||||||
|
existingDep.Spec.Template.Annotations[k] = v
|
||||||
|
}
|
||||||
|
// podSpec of deployment is hard to merge, use an annotation to assist
|
||||||
|
if DeploymentPodSpecChanged(desiredDep, existingDep) {
|
||||||
|
// Record last applied spec in favor of future equality check
|
||||||
|
b, err := json.Marshal(desiredDep.Spec.Template.Spec)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
existingDep.Annotations[LastAppliedConfigAnnotation] = string(b)
|
||||||
|
existingDep.Spec.Template.Spec = desiredDep.Spec.Template.Spec
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result.(*appsv1.Deployment), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *SeaweedReconciler) CreateOrUpdateService(svc *corev1.Service) (*corev1.Service, error) {
|
||||||
|
result, err := r.CreateOrUpdate(svc, func(existing, desired runtime.Object) error {
|
||||||
|
existingSvc := existing.(*corev1.Service)
|
||||||
|
desiredSvc := desired.(*corev1.Service)
|
||||||
|
|
||||||
|
if existingSvc.Annotations == nil {
|
||||||
|
existingSvc.Annotations = map[string]string{}
|
||||||
|
}
|
||||||
|
for k, v := range desiredSvc.Annotations {
|
||||||
|
existingSvc.Annotations[k] = v
|
||||||
|
}
|
||||||
|
existingSvc.Labels = desiredSvc.Labels
|
||||||
|
equal, err := ServiceEqual(desiredSvc, existingSvc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !equal {
|
||||||
|
// record desiredSvc Spec in annotations in favor of future equality checks
|
||||||
|
b, err := json.Marshal(desiredSvc.Spec)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
existingSvc.Annotations[LastAppliedConfigAnnotation] = string(b)
|
||||||
|
clusterIp := existingSvc.Spec.ClusterIP
|
||||||
|
ports := existingSvc.Spec.Ports
|
||||||
|
serviceType := existingSvc.Spec.Type
|
||||||
|
|
||||||
|
existingSvc.Spec = desiredSvc.Spec
|
||||||
|
existingSvc.Spec.ClusterIP = clusterIp
|
||||||
|
|
||||||
|
// If the existed service and the desired service is NodePort or LoadBalancerType, we should keep the nodePort unchanged.
|
||||||
|
if (serviceType == corev1.ServiceTypeNodePort || serviceType == corev1.ServiceTypeLoadBalancer) &&
|
||||||
|
(desiredSvc.Spec.Type == corev1.ServiceTypeNodePort || desiredSvc.Spec.Type == corev1.ServiceTypeLoadBalancer) {
|
||||||
|
for i, dport := range existingSvc.Spec.Ports {
|
||||||
|
for _, eport := range ports {
|
||||||
|
// Because the portName could be edited,
|
||||||
|
// we use Port number to link the desired Service Port and the existed Service Port in the nested loop
|
||||||
|
if dport.Port == eport.Port && dport.Protocol == eport.Protocol {
|
||||||
|
dport.NodePort = eport.NodePort
|
||||||
|
existingSvc.Spec.Ports[i] = dport
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result.(*corev1.Service), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *SeaweedReconciler) CreateOrUpdateIngress(ingress *extensionsv1beta1.Ingress) (*extensionsv1beta1.Ingress, error) {
|
||||||
|
result, err := r.CreateOrUpdate(ingress, func(existing, desired runtime.Object) error {
|
||||||
|
existingIngress := existing.(*extensionsv1beta1.Ingress)
|
||||||
|
desiredIngress := desired.(*extensionsv1beta1.Ingress)
|
||||||
|
|
||||||
|
if existingIngress.Annotations == nil {
|
||||||
|
existingIngress.Annotations = map[string]string{}
|
||||||
|
}
|
||||||
|
for k, v := range desiredIngress.Annotations {
|
||||||
|
existingIngress.Annotations[k] = v
|
||||||
|
}
|
||||||
|
existingIngress.Labels = desiredIngress.Labels
|
||||||
|
equal, err := IngressEqual(desiredIngress, existingIngress)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !equal {
|
||||||
|
// record desiredIngress Spec in annotations in favor of future equality checks
|
||||||
|
b, err := json.Marshal(desiredIngress.Spec)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
existingIngress.Annotations[LastAppliedConfigAnnotation] = string(b)
|
||||||
|
existingIngress.Spec = desiredIngress.Spec
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result.(*extensionsv1beta1.Ingress), nil
|
||||||
|
}
|
||||||
|
|
||||||
// EmptyClone create an clone of the resource with the same name and namespace (if namespace-scoped), with other fields unset
|
// EmptyClone create an clone of the resource with the same name and namespace (if namespace-scoped), with other fields unset
|
||||||
func EmptyClone(obj runtime.Object) (runtime.Object, error) {
|
func EmptyClone(obj runtime.Object) (runtime.Object, error) {
|
||||||
meta, ok := obj.(metav1.Object)
|
meta, ok := obj.(metav1.Object)
|
||||||
|
@ -136,3 +291,54 @@ func InferObjectKind(obj runtime.Object) (schema.GroupVersionKind, error) {
|
||||||
}
|
}
|
||||||
return gvks[0], nil
|
return gvks[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetDeploymentLastAppliedPodTemplate set last applied pod template from Deployment's annotation
|
||||||
|
func GetDeploymentLastAppliedPodTemplate(dep *appsv1.Deployment) (*corev1.PodSpec, error) {
|
||||||
|
applied, ok := dep.Annotations[LastAppliedPodTemplate]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("deployment:[%s/%s] not found spec's apply config", dep.GetNamespace(), dep.GetName())
|
||||||
|
}
|
||||||
|
podSpec := &corev1.PodSpec{}
|
||||||
|
err := json.Unmarshal([]byte(applied), podSpec)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return podSpec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeploymentPodSpecChanged checks whether the new deployment differs with the old one's last-applied-config
|
||||||
|
func DeploymentPodSpecChanged(newDep *appsv1.Deployment, oldDep *appsv1.Deployment) bool {
|
||||||
|
lastAppliedPodTemplate, err := GetDeploymentLastAppliedPodTemplate(oldDep)
|
||||||
|
if err != nil {
|
||||||
|
klog.Warningf("error get last-applied-config of deployment %s/%s: %v", oldDep.Namespace, oldDep.Name, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return !apiequality.Semantic.DeepEqual(newDep.Spec.Template.Spec, lastAppliedPodTemplate)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceEqual compares the new Service's spec with old Service's last applied config
|
||||||
|
func ServiceEqual(newSvc, oldSvc *corev1.Service) (bool, error) {
|
||||||
|
oldSpec := corev1.ServiceSpec{}
|
||||||
|
if lastAppliedConfig, ok := oldSvc.Annotations[LastAppliedConfigAnnotation]; ok {
|
||||||
|
err := json.Unmarshal([]byte(lastAppliedConfig), &oldSpec)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("unmarshal ServiceSpec: [%s/%s]'s applied config failed,error: %v", oldSvc.GetNamespace(), oldSvc.GetName(), err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return apiequality.Semantic.DeepEqual(oldSpec, newSvc.Spec), nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func IngressEqual(newIngress, oldIngres *extensionsv1beta1.Ingress) (bool, error) {
|
||||||
|
oldIngressSpec := extensionsv1beta1.IngressSpec{}
|
||||||
|
if lastAppliedConfig, ok := oldIngres.Annotations[LastAppliedConfigAnnotation]; ok {
|
||||||
|
err := json.Unmarshal([]byte(lastAppliedConfig), &oldIngressSpec)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("unmarshal IngressSpec: [%s/%s]'s applied config failed,error: %v", oldIngres.GetNamespace(), oldIngres.GetName(), err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return apiequality.Semantic.DeepEqual(oldIngressSpec, newIngress.Spec), nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
|
@ -5,9 +5,6 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
|
||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
ctrl "sigs.k8s.io/controller-runtime"
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
|
|
||||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||||
|
@ -33,8 +30,10 @@ func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (do
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||||
|
log := r.Log.WithValues("sw-filer-statefulset", seaweedCR.Name)
|
||||||
|
|
||||||
filerStatefulSet := r.createFilerStatefulSet(seaweedCR)
|
filerStatefulSet := r.createFilerStatefulSet(seaweedCR)
|
||||||
_, err := r.CreateOrUpdate(seaweedCR, filerStatefulSet, func(existing, desired runtime.Object) error {
|
_, err := r.CreateOrUpdate(filerStatefulSet, func(existing, desired runtime.Object) error {
|
||||||
existingStatefulSet := existing.(*appsv1.StatefulSet)
|
existingStatefulSet := existing.(*appsv1.StatefulSet)
|
||||||
desiredStatefulSet := desired.(*appsv1.StatefulSet)
|
desiredStatefulSet := desired.(*appsv1.StatefulSet)
|
||||||
|
|
||||||
|
@ -42,44 +41,34 @@ func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed)
|
||||||
existingStatefulSet.Spec.Template.Spec.Containers[0].Image = desiredStatefulSet.Spec.Template.Spec.Containers[0].Image
|
existingStatefulSet.Spec.Template.Spec.Containers[0].Image = desiredStatefulSet.Spec.Template.Spec.Containers[0].Image
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
log.Info("ensure filer stateful set " + filerStatefulSet.Name)
|
||||||
return ReconcileResult(err)
|
return ReconcileResult(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SeaweedReconciler) ensureFilerHeadlessService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
func (r *SeaweedReconciler) ensureFilerHeadlessService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||||
return r.ensureService(seaweedCR, "filer-headless", r.createFilerHeadlessService)
|
|
||||||
|
log := r.Log.WithValues("sw-filer-headless-service", seaweedCR.Name)
|
||||||
|
|
||||||
|
filerHeadlessService := r.createFilerHeadlessService(seaweedCR)
|
||||||
|
_, err := r.CreateOrUpdateService(filerHeadlessService)
|
||||||
|
|
||||||
|
log.Info("ensure filer headless service " + filerHeadlessService.Name)
|
||||||
|
|
||||||
|
return ReconcileResult(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SeaweedReconciler) ensureFilerNodePortService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
func (r *SeaweedReconciler) ensureFilerNodePortService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||||
return r.ensureService(seaweedCR, "filer", r.createFilerNodePortService)
|
|
||||||
|
log := r.Log.WithValues("sw-filer-service", seaweedCR.Name)
|
||||||
|
|
||||||
|
filerService := r.createFilerNodePortService(seaweedCR)
|
||||||
|
_, err := r.CreateOrUpdateService(filerService)
|
||||||
|
|
||||||
|
log.Info("ensure filer service " + filerService.Name)
|
||||||
|
|
||||||
|
return ReconcileResult(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func labelsForFiler(name string) map[string]string {
|
func labelsForFiler(name string) map[string]string {
|
||||||
return map[string]string{"app": "seaweedfs", "role": "filer", "name": name}
|
return map[string]string{"app": "seaweedfs", "role": "filer", "name": name}
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateServiceFunc func(m *seaweedv1.Seaweed) *corev1.Service
|
|
||||||
|
|
||||||
func (r *SeaweedReconciler) ensureService(seaweedCR *seaweedv1.Seaweed, nameSuffix string, serviceFunc CreateServiceFunc) (bool, ctrl.Result, error) {
|
|
||||||
ctx := context.Background()
|
|
||||||
log := r.Log.WithValues("sw", seaweedCR.Name, "service", nameSuffix)
|
|
||||||
|
|
||||||
aService := &corev1.Service{}
|
|
||||||
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-" + nameSuffix, Namespace: seaweedCR.Namespace}, aService)
|
|
||||||
if err != nil && errors.IsNotFound(err) {
|
|
||||||
// Define a new deployment
|
|
||||||
dep := serviceFunc(seaweedCR)
|
|
||||||
log.Info("Creating a new service", "Namespace", dep.Namespace, "Name", dep.Name)
|
|
||||||
err = r.Create(ctx, dep)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "Failed to create service", "Namespace", dep.Namespace, "Name", dep.Name)
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
// Deployment created successfully - return and requeue
|
|
||||||
return ReconcileResult(err)
|
|
||||||
} else if err != nil {
|
|
||||||
log.Error(err, "Failed to get server service")
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
log.Info("Get service " + aService.Name)
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
ctrl "sigs.k8s.io/controller-runtime"
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
|
@ -72,26 +71,11 @@ func (r *SeaweedReconciler) ensureMasterStatefulSet(seaweedCR *seaweedv1.Seaweed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SeaweedReconciler) ensureMasterService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
func (r *SeaweedReconciler) ensureMasterService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||||
ctx := context.Background()
|
|
||||||
log := r.Log.WithValues("sw-master-service", seaweedCR.Name)
|
log := r.Log.WithValues("sw-master-service", seaweedCR.Name)
|
||||||
|
|
||||||
masterService := &corev1.Service{}
|
masterService := r.createMasterService(seaweedCR)
|
||||||
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-master", Namespace: seaweedCR.Namespace}, masterService)
|
_, err := r.CreateOrUpdateService(masterService)
|
||||||
if err != nil && errors.IsNotFound(err) {
|
|
||||||
// Define a new deployment
|
|
||||||
dep := r.createMasterService(seaweedCR)
|
|
||||||
log.Info("Creating a new master service", "Namespace", dep.Namespace, "Name", dep.Name)
|
|
||||||
err = r.Create(ctx, dep)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "Failed to create master service", "Namespace", dep.Namespace, "Name", dep.Name)
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
// Deployment created successfully - return and requeue
|
|
||||||
return ReconcileResult(err)
|
|
||||||
} else if err != nil {
|
|
||||||
log.Error(err, "Failed to get master service", "Namespace", seaweedCR.Namespace, "Name", seaweedCR.Name+"-master")
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
log.Info("Get master service " + masterService.Name)
|
log.Info("Get master service " + masterService.Name)
|
||||||
return ReconcileResult(err)
|
return ReconcileResult(err)
|
||||||
|
|
||||||
|
|
|
@ -2,11 +2,9 @@ package controllers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
|
||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
corev1 "k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
ctrl "sigs.k8s.io/controller-runtime"
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
|
|
||||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||||
|
@ -28,65 +26,30 @@ func (r *SeaweedReconciler) ensureVolumeServers(seaweedCR *seaweedv1.Seaweed) (d
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SeaweedReconciler) ensureVolumeServerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
func (r *SeaweedReconciler) ensureVolumeServerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||||
ctx := context.Background()
|
|
||||||
log := r.Log.WithValues("sw-volume-statefulset", seaweedCR.Name)
|
log := r.Log.WithValues("sw-volume-statefulset", seaweedCR.Name)
|
||||||
|
|
||||||
volumeServerStatefulSet := &appsv1.StatefulSet{}
|
volumeServerStatefulSet := r.createVolumeServerStatefulSet(seaweedCR)
|
||||||
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerStatefulSet)
|
_, err := r.CreateOrUpdate(volumeServerStatefulSet, func(existing, desired runtime.Object) error {
|
||||||
if err != nil && errors.IsNotFound(err) {
|
existingStatefulSet := existing.(*appsv1.StatefulSet)
|
||||||
// Define a new deployment
|
desiredStatefulSet := desired.(*appsv1.StatefulSet)
|
||||||
dep := r.createVolumeServerStatefulSet(seaweedCR)
|
|
||||||
log.Info("Creating a new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
|
|
||||||
err = r.Create(ctx, dep)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "Failed to create new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
// Deployment created successfully - return and requeue
|
|
||||||
return ReconcileResult(err)
|
|
||||||
} else if err != nil {
|
|
||||||
log.Error(err, "Failed to get volume server statefulset")
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if *volumeServerStatefulSet.Spec.Replicas != seaweedCR.Spec.VolumeServerCount ||
|
existingStatefulSet.Spec.Replicas = desiredStatefulSet.Spec.Replicas
|
||||||
volumeServerStatefulSet.Spec.Template.Spec.Containers[0].Image != seaweedCR.Spec.Image {
|
existingStatefulSet.Spec.Template.Spec.Containers[0].Image = desiredStatefulSet.Spec.Template.Spec.Containers[0].Image
|
||||||
volumeServerStatefulSet.Spec.Replicas = &seaweedCR.Spec.VolumeServerCount
|
return nil
|
||||||
volumeServerStatefulSet.Spec.Template.Spec.Containers[0].Image = seaweedCR.Spec.Image
|
})
|
||||||
if err = r.Update(ctx, volumeServerStatefulSet); err != nil {
|
|
||||||
log.Error(err, "Failed to update volume statefulset", "Namespace", volumeServerStatefulSet.Namespace, "Name", volumeServerStatefulSet.Name)
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
// Deployment created successfully - return and requeue
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Get volume stateful set " + volumeServerStatefulSet.Name)
|
log.Info("ensure volume stateful set " + volumeServerStatefulSet.Name)
|
||||||
return ReconcileResult(err)
|
return ReconcileResult(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SeaweedReconciler) ensureVolumeServerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
func (r *SeaweedReconciler) ensureVolumeServerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||||
ctx := context.Background()
|
|
||||||
log := r.Log.WithValues("sw-volume-service", seaweedCR.Name)
|
log := r.Log.WithValues("sw-volume-service", seaweedCR.Name)
|
||||||
|
|
||||||
volumeServerService := &corev1.Service{}
|
volumeServerService := r.createVolumeServerService(seaweedCR)
|
||||||
err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerService)
|
_, err := r.CreateOrUpdateService(volumeServerService)
|
||||||
if err != nil && errors.IsNotFound(err) {
|
|
||||||
// Define a new deployment
|
log.Info("ensure volume service " + volumeServerService.Name)
|
||||||
dep := r.createVolumeServerService(seaweedCR)
|
|
||||||
log.Info("Creating a new volume service", "Namespace", dep.Namespace, "Name", dep.Name)
|
|
||||||
err = r.Create(ctx, dep)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "Failed to create new volume service", "Namespace", dep.Namespace, "Name", dep.Name)
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
// Deployment created successfully - return and requeue
|
|
||||||
return ReconcileResult(err)
|
|
||||||
} else if err != nil {
|
|
||||||
log.Error(err, "Failed to get volume server service")
|
|
||||||
return ReconcileResult(err)
|
|
||||||
}
|
|
||||||
log.Info("Get volume service " + volumeServerService.Name)
|
|
||||||
return ReconcileResult(err)
|
return ReconcileResult(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue