From 3a17b591e7d70f42f3fc5e9af7955c3321ec7543 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 4 Aug 2020 22:13:36 -0700 Subject: [PATCH] add volume service and statefulset --- api/v1/seaweed_types.go | 3 + .../bases/seaweed.seaweedfs.com_seaweeds.yaml | 4 + controllers/controller_volume.go | 82 +++++++++++ controllers/controller_volume_service.go | 50 +++++++ controllers/controller_volume_statefulset.go | 134 ++++++++++++++++++ controllers/seaweed_controller.go | 9 ++ 6 files changed, 282 insertions(+) create mode 100644 controllers/controller_volume.go create mode 100644 controllers/controller_volume_service.go create mode 100644 controllers/controller_volume_statefulset.go diff --git a/api/v1/seaweed_types.go b/api/v1/seaweed_types.go index 311b2e1..1ab07c9 100644 --- a/api/v1/seaweed_types.go +++ b/api/v1/seaweed_types.go @@ -30,6 +30,9 @@ type SeaweedSpec struct { // MetricsAddress is Prometheus gateway address MetricsAddress string `json:"metricsAddress,omitempty"` + + // VolumeServerCount is the number of volume servers, default to 1 + VolumeServerCount int `json:"volumeServerCount,omitempty"` } // SeaweedStatus defines the observed state of Seaweed diff --git a/config/crd/bases/seaweed.seaweedfs.com_seaweeds.yaml b/config/crd/bases/seaweed.seaweedfs.com_seaweeds.yaml index 2602b07..7ed4520 100644 --- a/config/crd/bases/seaweed.seaweedfs.com_seaweeds.yaml +++ b/config/crd/bases/seaweed.seaweedfs.com_seaweeds.yaml @@ -39,6 +39,10 @@ spec: metricsAddress: description: MetricsAddress is Prometheus gateway address type: string + volumeServerCount: + description: VolumeServerCount is the number of volume servers, default + to 1 + type: integer type: object status: description: SeaweedStatus defines the observed state of Seaweed diff --git a/controllers/controller_volume.go b/controllers/controller_volume.go new file mode 100644 index 0000000..5d0cc28 --- /dev/null +++ b/controllers/controller_volume.go @@ -0,0 +1,82 @@ +package controllers + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) ensureVolumeServers(seaweedCR *seaweedv1.Seaweed) (done bool, result ctrl.Result, err error) { + _ = context.Background() + _ = r.Log.WithValues("seaweed", seaweedCR.Name) + + if done, result, err = r.ensureVolumeServerStatefulSet(seaweedCR); done { + return done, result, err + } + + if done, result, err = r.ensureVolumeServerService(seaweedCR); done { + return done, result, err + } + + return false, ctrl.Result{}, nil +} + +func (r *SeaweedReconciler) ensureVolumeServerStatefulSet(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { + ctx := context.Background() + log := r.Log.WithValues("sw-volume-statefulset", seaweedCR.Name) + + volumeServerStatefulSet := &appsv1.StatefulSet{} + err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerStatefulSet) + if err != nil && errors.IsNotFound(err) { + // Define a new deployment + dep := r.createVolumeServerStatefulSet(seaweedCR) + log.Info("Creating a new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name) + err = r.Create(ctx, dep) + if err != nil { + log.Error(err, "Failed to create new volume statefulset", "Namespace", dep.Namespace, "Name", dep.Name) + return true, ctrl.Result{}, err + } + // Deployment created successfully - return and requeue + return false, ctrl.Result{}, nil + } else if err != nil { + log.Error(err, "Failed to get volume server statefulset") + return true, ctrl.Result{}, err + } + log.Info("Get volume stateful set " + volumeServerStatefulSet.Name) + return false, ctrl.Result{}, nil +} + +func (r *SeaweedReconciler) ensureVolumeServerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) { + ctx := context.Background() + log := r.Log.WithValues("sw-volume-service", seaweedCR.Name) + + volumeServerService := &corev1.Service{} + err := r.Get(ctx, types.NamespacedName{Name: seaweedCR.Name + "-volume", Namespace: seaweedCR.Namespace}, volumeServerService) + if err != nil && errors.IsNotFound(err) { + // Define a new deployment + dep := r.createVolumeServerService(seaweedCR) + log.Info("Creating a new volume service", "Namespace", dep.Namespace, "Name", dep.Name) + err = r.Create(ctx, dep) + if err != nil { + log.Error(err, "Failed to create new volume service", "Namespace", dep.Namespace, "Name", dep.Name) + return true, ctrl.Result{}, err + } + // Deployment created successfully - return and requeue + return false, ctrl.Result{}, nil + } else if err != nil { + log.Error(err, "Failed to get volume server service") + return true, ctrl.Result{}, err + } + log.Info("Get volume service " + volumeServerService.Name) + return false, ctrl.Result{}, nil +} + +func labelsForVolumeServer(name string) map[string]string { + return map[string]string{"app": "seaweedfs", "role": "volume", "name": name} +} diff --git a/controllers/controller_volume_service.go b/controllers/controller_volume_service.go new file mode 100644 index 0000000..aed2483 --- /dev/null +++ b/controllers/controller_volume_service.go @@ -0,0 +1,50 @@ +package controllers + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) createVolumeServerService(m *seaweedv1.Seaweed) *corev1.Service { + labels := labelsForVolumeServer(m.Name) + + dep := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: m.Name + "-volume", + Namespace: m.Namespace, + Labels: labels, + Annotations: map[string]string{ + "service.alpha.kubernetes.io/tolerate-unready-endpoints": "true", + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + PublishNotReadyAddresses: true, + Ports: []corev1.ServicePort{ + { + Name: "swfs-volume", + Protocol: corev1.Protocol("TCP"), + Port: 8444, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 8444, + }, + }, + { + Name: "swfs-volume-grpc", + Protocol: corev1.Protocol("TCP"), + Port: 18444, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 18444, + }, + }, + }, + Selector: labels, + }, + } + return dep +} diff --git a/controllers/controller_volume_statefulset.go b/controllers/controller_volume_statefulset.go new file mode 100644 index 0000000..8945e66 --- /dev/null +++ b/controllers/controller_volume_statefulset.go @@ -0,0 +1,134 @@ +package controllers + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1" +) + +func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet { + labels := labelsForVolumeServer(m.Name) + replicas := int32(m.Spec.VolumeServerCount) + rollingUpdatePartition := int32(0) + enableServiceLinks := false + + dep := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: m.Name + "-volume", + Namespace: m.Namespace, + }, + Spec: appsv1.StatefulSetSpec{ + ServiceName: m.Name + "-volume", + PodManagementPolicy: appsv1.ParallelPodManagement, + Replicas: &replicas, + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: &rollingUpdatePartition, + }, + }, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + EnableServiceLinks: &enableServiceLinks, + Containers: []corev1.Container{{ + Name: "seaweedfs", + Image: "chrislusf/seaweedfs:latest", + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + { + Name: "POD_IP", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + }, + Command: []string{ + "/bin/sh", + "-ec", + fmt.Sprintf("weed volume -port=8444 -max=0 %s %s", + fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name), + fmt.Sprintf("-peers=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333", + m.Name, m.Name, m.Name, m.Name, m.Name, m.Name), + ), + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8444, + Name: "swfs-volume", + }, + { + ContainerPort: 18444, + }, + }, + /* + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/cluster/status", + Port: intstr.IntOrString{ + Type: 0, + IntVal: 9333, + }, + Scheme: "http", + }, + }, + InitialDelaySeconds: 5, + TimeoutSeconds: 0, + PeriodSeconds: 15, + SuccessThreshold: 2, + FailureThreshold: 100, + }, + LivenessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/cluster/status", + Port: intstr.IntOrString{ + Type: 0, + IntVal: 9333, + }, + Scheme: "http", + }, + }, + InitialDelaySeconds: 20, + TimeoutSeconds: 0, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 6, + }, + + */ + }}, + }, + }, + }, + } + return dep +} diff --git a/controllers/seaweed_controller.go b/controllers/seaweed_controller.go index ba9bbf6..fe62708 100644 --- a/controllers/seaweed_controller.go +++ b/controllers/seaweed_controller.go @@ -50,10 +50,19 @@ func (r *SeaweedReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return result, err } + // temporary + if seaweedCR.Spec.VolumeServerCount == 0 { + seaweedCR.Spec.VolumeServerCount = 1 + } + if done, result, err = r.ensureMaster(seaweedCR); done { return result, err } + if done, result, err = r.ensureVolumeServers(seaweedCR); done { + return result, err + } + return ctrl.Result{}, nil }