From bef24b2e3366133fb5a470d3b509820fbad4b62b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 5 Aug 2020 21:11:40 -0700 Subject: [PATCH] create filer and s3 --- api/v1/seaweed_types.go | 6 + .../bases/seaweed.seaweedfs.com_seaweeds.yaml | 6 + config/samples/seaweed_v1_seaweed.yaml | 2 +- controllers/controller_filer.go | 82 +++++++++++ controllers/controller_filer_service.go | 50 +++++++ controllers/controller_filer_statefulset.go | 134 ++++++++++++++++++ controllers/controller_s3.go | 82 +++++++++++ controllers/controller_s3_deployment.go | 123 ++++++++++++++++ controllers/controller_s3_service.go | 36 +++++ controllers/controller_volume_statefulset.go | 2 +- controllers/seaweed_controller.go | 14 ++ 11 files changed, 535 insertions(+), 2 deletions(-) create mode 100644 controllers/controller_filer.go create mode 100644 controllers/controller_filer_service.go create mode 100644 controllers/controller_filer_statefulset.go create mode 100644 controllers/controller_s3.go create mode 100644 controllers/controller_s3_deployment.go create mode 100644 controllers/controller_s3_service.go diff --git a/api/v1/seaweed_types.go b/api/v1/seaweed_types.go index 1ab07c9..74f1516 100644 --- a/api/v1/seaweed_types.go +++ b/api/v1/seaweed_types.go @@ -33,6 +33,12 @@ type SeaweedSpec struct { // VolumeServerCount is the number of volume servers, default to 1 VolumeServerCount int `json:"volumeServerCount,omitempty"` + + // FilerCount is the number of filers, default to 1 + FilerCount int `json:"filerCount,omitempty"` + + // S3Count is the number of s3, default to 1 + S3Count int `json:"s3Count,omitempty"` } // SeaweedStatus defines the observed state of Seaweed diff --git a/config/crd/bases/seaweed.seaweedfs.com_seaweeds.yaml b/config/crd/bases/seaweed.seaweedfs.com_seaweeds.yaml index 7ed4520..e9daa64 100644 --- a/config/crd/bases/seaweed.seaweedfs.com_seaweeds.yaml +++ b/config/crd/bases/seaweed.seaweedfs.com_seaweeds.yaml @@ -36,9 +36,15 @@ spec: spec: description: SeaweedSpec defines the desired state of Seaweed properties: + filerCount: + description: FilerCount is the number of filers, default to 1 + type: integer metricsAddress: description: MetricsAddress is Prometheus gateway address type: string + s3Count: + description: S3Count is the number of s3, default to 1 + type: integer volumeServerCount: description: VolumeServerCount is the number of volume servers, default to 1 diff --git a/config/samples/seaweed_v1_seaweed.yaml b/config/samples/seaweed_v1_seaweed.yaml index dbafbcf..8f4f5c0 100644 --- a/config/samples/seaweed_v1_seaweed.yaml +++ b/config/samples/seaweed_v1_seaweed.yaml @@ -1,7 +1,7 @@ apiVersion: seaweed.seaweedfs.com/v1 kind: Seaweed metadata: - name: seaweed-sample + name: seaweed1 spec: # Add fields here foo: bar diff --git a/controllers/controller_filer.go b/controllers/controller_filer.go new file mode 100644 index 0000000..059e385 --- /dev/null +++ b/controllers/controller_filer.go @@ -0,0 +1,82 @@ +package controllers + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) { + _ = context.Background() + _ = r.Log.WithValues("seaweed", seaweedCR.Name) + + if done, result, err = r.ensureFilerStatefulSet(seaweedCR); done { + return done, result, err + } + + if done, result, err = r.ensureFilerService(seaweedCR); done { + return done, result, err + } + + return false, ctrl.Result{}, nil +} + +func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { + ctx := context.Background() + log := r.Log.WithValues("sw-filer-statefulset", seaweedCR.Name) + + filerStatefulSet := &appsv1.StatefulSet{} + err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-filer", Namespace: seaweedCR.Namespace}, filerStatefulSet) + if err != nil && errors.IsNotFound(err) { + // Define a new deployment + dep := r.createFilerStatefulSet(seaweedCR) + log.Info("Creating a new filer statefulset", "Namespace", dep.Namespace, "Name", dep.Name) + err = r.Create(ctx, dep) + if err != nil { + log.Error(err, "Failed to create new filer statefulset", "Namespace", dep.Namespace, "Name", dep.Name) + return true, ctrl.Result{}, err + } + // Deployment created successfully - return and requeue + return false, ctrl.Result{}, nil + } else if err != nil { + log.Error(err, "Failed to get filer statefulset") + return true, ctrl.Result{}, err + } + log.Info("Get filer stateful set " + filerStatefulSet.Name) + return false, ctrl.Result{}, nil +} + +func (r *SeaweedReconciler) ensureFilerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { + ctx := context.Background() + log := r.Log.WithValues("sw-filer-service", seaweedCR.Name) + + volumeServerService := &corev1.Service{} + err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-filer", Namespace: seaweedCR.Namespace}, volumeServerService) + if err != nil && errors.IsNotFound(err) { + // Define a new deployment + dep := r.createFilerService(seaweedCR) + log.Info("Creating a new filer service", "Namespace", dep.Namespace, "Name", dep.Name) + err = r.Create(ctx, dep) + if err != nil { + log.Error(err, "Failed to create new filer service", "Namespace", dep.Namespace, "Name", dep.Name) + return true, ctrl.Result{}, err + } + // Deployment created successfully - return and requeue + return false, ctrl.Result{}, nil + } else if err != nil { + log.Error(err, "Failed to get filer server service") + return true, ctrl.Result{}, err + } + log.Info("Get filer service " + volumeServerService.Name) + return false, ctrl.Result{}, nil +} + +func labelsForFiler(name string) map[string]string { + return map[string]string{"app": "seaweedfs", "role": "filer", "name": name} +} diff --git a/controllers/controller_filer_service.go b/controllers/controller_filer_service.go new file mode 100644 index 0000000..6f8fddd --- /dev/null +++ b/controllers/controller_filer_service.go @@ -0,0 +1,50 @@ +package controllers + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) createFilerService(m *seaweedv1.Seaweed) *corev1.Service { + labels := labelsForFiler(m.Name) + + dep := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: m.Name + "-filer", + Namespace: m.Namespace, + Labels: labels, + Annotations: map[string]string{ + "service.alpha.kubernetes.io/tolerate-unready-endpoints": "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + PublishNotReadyAddresses: true, + Ports: []corev1.ServicePort{ + { + Name: "swfs-filer", + Protocol: corev1.Protocol("TCP"), + Port: 8888, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 8888, + }, + }, + { + Name: "swfs-volume-grpc", + Protocol: corev1.Protocol("TCP"), + Port: 18888, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 18888, + }, + }, + }, + Selector: labels, + }, + } + return dep +} diff --git a/controllers/controller_filer_statefulset.go b/controllers/controller_filer_statefulset.go new file mode 100644 index 0000000..3278c27 --- /dev/null +++ b/controllers/controller_filer_statefulset.go @@ -0,0 +1,134 @@ +package controllers + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet { + labels := labelsForFiler(m.Name) + replicas := int32(m.Spec.FilerCount) + rollingUpdatePartition := int32(0) + enableServiceLinks := false + + dep := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: m.Name + "-filer", + Namespace: m.Namespace, + }, + Spec: appsv1.StatefulSetSpec{ + ServiceName: m.Name + "-filer", + PodManagementPolicy: appsv1.ParallelPodManagement, + Replicas: &replicas, + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: &rollingUpdatePartition, + }, + }, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + EnableServiceLinks: &enableServiceLinks, + Containers: []corev1.Container{{ + Name: "seaweedfs", + Image: "chrislusf/seaweedfs:latest", + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + { + Name: "POD_IP", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + }, + Command: []string{ + "/bin/sh", + "-ec", + fmt.Sprintf("weed filer -port=8888 %s %s", + fmt.Sprintf("-ip=$(POD_NAME).%s-filer", m.Name), + fmt.Sprintf("-peers=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333", + m.Name, m.Name, m.Name, m.Name, m.Name, m.Name), + ), + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8888, + Name: "swfs-filer", + }, + { + ContainerPort: 18888, + }, + }, + /* + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/cluster/status", + Port: intstr.IntOrString{ + Type: 0, + IntVal: 9333, + }, + Scheme: "http", + }, + }, + InitialDelaySeconds: 5, + TimeoutSeconds: 0, + PeriodSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 100, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/cluster/status", + Port: intstr.IntOrString{ + Type: 0, + IntVal: 9333, + }, + Scheme: "http", + }, + }, + InitialDelaySeconds: 20, + TimeoutSeconds: 0, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 6, + }, + + */ + }}, + }, + }, + }, + } + return dep +} diff --git a/controllers/controller_s3.go b/controllers/controller_s3.go new file mode 100644 index 0000000..224d474 --- /dev/null +++ b/controllers/controller_s3.go @@ -0,0 +1,82 @@ +package controllers + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) ensureS3Servers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) { + _ = context.Background() + _ = r.Log.WithValues("seaweed", seaweedCR.Name) + + if done, result, err = r.ensureS3Deployment(seaweedCR); done { + return done, result, err + } + + if done, result, err = r.ensureS3Service(seaweedCR); done { + return done, result, err + } + + return false, ctrl.Result{}, nil +} + +func (r *SeaweedReconciler) ensureS3Deployment(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { + ctx := context.Background() + log := r.Log.WithValues("sw-s3-statefulset", seaweedCR.Name) + + s3Deployment := &appsv1.Deployment{} + err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-s3", Namespace: seaweedCR.Namespace}, s3Deployment) + if err != nil && errors.IsNotFound(err) { + // Define a new deployment + dep := r.createS3Deployment(seaweedCR) + log.Info("Creating a new s3 deployment", "Namespace", dep.Namespace, "Name", dep.Name) + err = r.Create(ctx, dep) + if err != nil { + log.Error(err, "Failed to create new s3 statefulset", "Namespace", dep.Namespace, "Name", dep.Name) + return true, ctrl.Result{}, err + } + // Deployment created successfully - return and requeue + return false, ctrl.Result{}, nil + } else if err != nil { + log.Error(err, "Failed to get s3 statefulset") + return true, ctrl.Result{}, err + } + log.Info("Get s3 stateful set " + s3Deployment.Name) + return false, ctrl.Result{}, nil +} + +func (r *SeaweedReconciler) ensureS3Service(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { + ctx := context.Background() + log := r.Log.WithValues("sw-filer-service", seaweedCR.Name) + + s3Service := &corev1.Service{} + err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-s3", Namespace: seaweedCR.Namespace}, s3Service) + if err != nil && errors.IsNotFound(err) { + // Define a new deployment + dep := r.createS3Service(seaweedCR) + log.Info("Creating a new s3 service", "Namespace", dep.Namespace, "Name", dep.Name) + err = r.Create(ctx, dep) + if err != nil { + log.Error(err, "Failed to create new s3 service", "Namespace", dep.Namespace, "Name", dep.Name) + return true, ctrl.Result{}, err + } + // Deployment created successfully - return and requeue + return false, ctrl.Result{}, nil + } else if err != nil { + log.Error(err, "Failed to get s3 server service") + return true, ctrl.Result{}, err + } + log.Info("Get s3 service " + s3Service.Name) + return false, ctrl.Result{}, nil +} + +func labelsForS3(name string) map[string]string { + return map[string]string{"app": "seaweedfs", "role": "s3", "name": name} +} diff --git a/controllers/controller_s3_deployment.go b/controllers/controller_s3_deployment.go new file mode 100644 index 0000000..1f05996 --- /dev/null +++ b/controllers/controller_s3_deployment.go @@ -0,0 +1,123 @@ +package controllers + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) createS3Deployment(m *seaweedv1.Seaweed) *appsv1.Deployment { + labels := labelsForS3(m.Name) + replicas := int32(m.Spec.S3Count) + enableServiceLinks := false + + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: m.Name + "-s3", + Namespace: m.Namespace, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + EnableServiceLinks: &enableServiceLinks, + Containers: []corev1.Container{{ + Name: "seaweedfs", + Image: "chrislusf/seaweedfs:latest", + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + { + Name: "POD_IP", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + }, + Command: []string{ + "/bin/sh", + "-ec", + fmt.Sprintf("weed s3 -port=8333 %s", + fmt.Sprintf("-filer=$(POD_NAME).%s-filer:8888", m.Name), + ), + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8333, + Name: "swfs-s3", + }, + { + ContainerPort: 18333, + }, + }, + /* + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/cluster/status", + Port: intstr.IntOrString{ + Type: 0, + IntVal: 9333, + }, + Scheme: "http", + }, + }, + InitialDelaySeconds: 5, + TimeoutSeconds: 0, + PeriodSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 100, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/cluster/status", + Port: intstr.IntOrString{ + Type: 0, + IntVal: 9333, + }, + Scheme: "http", + }, + }, + InitialDelaySeconds: 20, + TimeoutSeconds: 0, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 6, + }, + + */ + }}, + }, + }, + }, + } + return dep +} diff --git a/controllers/controller_s3_service.go b/controllers/controller_s3_service.go new file mode 100644 index 0000000..4265a86 --- /dev/null +++ b/controllers/controller_s3_service.go @@ -0,0 +1,36 @@ +package controllers + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) createS3Service(m *seaweedv1.Seaweed) *corev1.Service { + labels := labelsForS3(m.Name) + + dep := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: m.Name + "-s3", + Namespace: m.Namespace, + Labels: labels, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "swfs-s3", + Protocol: corev1.Protocol("TCP"), + Port: 8333, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 8333, + }, + }, + }, + Selector: labels, + }, + } + return dep +} diff --git a/controllers/controller_volume_statefulset.go b/controllers/controller_volume_statefulset.go index 8945e66..94dc3e9 100644 --- a/controllers/controller_volume_statefulset.go +++ b/controllers/controller_volume_statefulset.go @@ -75,7 +75,7 @@ func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed) "-ec", fmt.Sprintf("weed volume -port=8444 -max=0 %s %s", fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name), - fmt.Sprintf("-peers=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333", + fmt.Sprintf("-mserver=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333", m.Name, m.Name, m.Name, m.Name, m.Name, m.Name), ), }, diff --git a/controllers/seaweed_controller.go b/controllers/seaweed_controller.go index fe62708..570ce02 100644 --- a/controllers/seaweed_controller.go +++ b/controllers/seaweed_controller.go @@ -54,6 +54,12 @@ func (r *SeaweedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { if seaweedCR.Spec.VolumeServerCount == 0 { seaweedCR.Spec.VolumeServerCount = 1 } + if seaweedCR.Spec.FilerCount == 0 { + seaweedCR.Spec.FilerCount = 1 + } + if seaweedCR.Spec.S3Count == 0 { + seaweedCR.Spec.S3Count = 1 + } if done, result, err = r.ensureMaster(seaweedCR); done { return result, err @@ -63,6 +69,14 @@ func (r *SeaweedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return result, err } + if done, result, err = r.ensureFilerServers(seaweedCR); done { + return result, err + } + + if done, result, err = r.ensureS3Servers(seaweedCR); done { + return result, err + } + return ctrl.Result{}, nil }