diff --git a/api/v1/seaweed_types.go b/api/v1/seaweed_types.go index f4033e5..e6af00e 100644 --- a/api/v1/seaweed_types.go +++ b/api/v1/seaweed_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1 import ( + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -34,9 +36,19 @@ type SeaweedSpec struct { // Image Image string `json:"image,omitempty"` + // Version + Version string `json:"version,omitempty"` + + // Master + Master *MasterSpec `json:"master,omitempty"` + // Volume + Volume *VolumeSpec `json:"volume,omitempty"` + // Filer + Filer *FilerSpec `json:"filer,omitempty"` + // VolumeServerCount is the number of volume servers, default to 1 - VolumeServerCount int32 `json:"volumeServerCount,omitempty"` - VolumeServerDiskCount int32 `json:"volumeServerDiskCount,omitempty"` + VolumeServerCount int32 `json:"volumeServerCount,omitempty"` + VolumeServerDiskCount int32 `json:"volumeServerDiskCount,omitempty"` VolumeServerDiskSizeInGiB int32 `json:"volumeServerDiskSizeInGiB,omitempty"` // FilerCount is the number of filers, default to 1 @@ -44,7 +56,6 @@ type SeaweedSpec struct { // ingress Hosts []string `json:"hosts"` - } // SeaweedStatus defines the observed state of Seaweed @@ -53,6 +64,105 @@ type SeaweedStatus struct { // Important: Run "make" to regenerate code after modifying this file } +// MasterSpec is the spec for masters +type MasterSpec struct { + ComponentSpec `json:",inline"` + corev1.ResourceRequirements `json:",inline"` + + // The desired ready replicas + // +kubebuilder:validation:Minimum=1 + Replicas int32 `json:"replicas"` +} + +// VolumeSpec is the spec for volume servers +type VolumeSpec struct { + ComponentSpec `json:",inline"` + corev1.ResourceRequirements `json:",inline"` + + // The desired ready replicas + // +kubebuilder:validation:Minimum=1 + Replicas int32 `json:"replicas"` +} + +// FilerSpec is the spec for filers +type FilerSpec struct { + ComponentSpec `json:",inline"` + corev1.ResourceRequirements `json:",inline"` + + // The desired ready replicas + // +kubebuilder:validation:Minimum=0 + Replicas int32 `json:"replicas"` +} + +// ComponentSpec is the base spec of each component, the fields should always accessed by the BasicSpec() method to respect the cluster-level properties +type ComponentSpec struct { + // Version of the component. Override the cluster-level version if non-empty + Version *string `json:"version,omitempty"` + + // ImagePullPolicy of the component. Override the cluster-level imagePullPolicy if present + ImagePullPolicy *corev1.PullPolicy `json:"imagePullPolicy,omitempty"` + + // ImagePullSecrets is an optional list of references to secrets in the same namespace to use for pulling any of the images. + ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + + // Whether Hostnetwork of the component is enabled. Override the cluster-level setting if present + HostNetwork *bool `json:"hostNetwork,omitempty"` + + // Affinity of the component. Override the cluster-level one if present + Affinity *corev1.Affinity `json:"affinity,omitempty"` + + // PriorityClassName of the component. Override the cluster-level one if present + PriorityClassName *string `json:"priorityClassName,omitempty"` + + // SchedulerName of the component. Override the cluster-level one if present + SchedulerName *string `json:"schedulerName,omitempty"` + + // NodeSelector of the component. Merged into the cluster-level nodeSelector if non-empty + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + + // Annotations of the component. Merged into the cluster-level annotations if non-empty + Annotations map[string]string `json:"annotations,omitempty"` + + // Tolerations of the component. Override the cluster-level tolerations if non-empty + Tolerations []corev1.Toleration `json:"tolerations,omitempty"` + + // List of environment variables to set in the container, like + // v1.Container.Env. + // Note that following env names cannot be used and may be overrided by + // tidb-operator built envs. + // - NAMESPACE + // - TZ + // - SERVICE_NAME + // - PEER_SERVICE_NAME + // - HEADLESS_SERVICE_NAME + // - SET_NAME + // - HOSTNAME + // - CLUSTER_NAME + // - POD_NAME + Env []corev1.EnvVar `json:"env,omitempty"` + + // Additional containers of the component. + AdditionalContainers []corev1.Container `json:"additionalContainers,omitempty"` + + // Additional volumes of component pod. Currently this only + // supports additional volume mounts for sidecar containers. + AdditionalVolumes []corev1.Volume `json:"additionalVolumes,omitempty"` + + // Optional duration in seconds the pod needs to terminate gracefully. May be decreased in delete request. + // Value must be non-negative integer. The value zero indicates delete immediately. + // If this value is nil, the default grace period will be used instead. + // The grace period is the duration in seconds after the processes running in the pod are sent + // a termination signal and the time when the processes are forcibly halted with a kill signal. + // Set this value longer than the expected cleanup time for your process. + // Defaults to 30 seconds. + TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty"` + + // StatefulSetUpdateStrategy indicates the StatefulSetUpdateStrategy that will be + // employed to update Pods in the StatefulSet when a revision is made to + // Template. + StatefulSetUpdateStrategy appsv1.StatefulSetUpdateStrategyType `json:"statefulSetUpdateStrategy,omitempty"` +} + // +kubebuilder:object:root=true // +kubebuilder:subresource:status diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 384ca61..892a1b8 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -21,9 +21,141 @@ limitations under the License. package v1 import ( + corev1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ComponentSpec) DeepCopyInto(out *ComponentSpec) { + *out = *in + if in.Version != nil { + in, out := &in.Version, &out.Version + *out = new(string) + **out = **in + } + if in.ImagePullPolicy != nil { + in, out := &in.ImagePullPolicy, &out.ImagePullPolicy + *out = new(corev1.PullPolicy) + **out = **in + } + if in.ImagePullSecrets != nil { + in, out := &in.ImagePullSecrets, &out.ImagePullSecrets + *out = make([]corev1.LocalObjectReference, len(*in)) + copy(*out, *in) + } + if in.HostNetwork != nil { + in, out := &in.HostNetwork, &out.HostNetwork + *out = new(bool) + **out = **in + } + if in.Affinity != nil { + in, out := &in.Affinity, &out.Affinity + *out = new(corev1.Affinity) + (*in).DeepCopyInto(*out) + } + if in.PriorityClassName != nil { + in, out := &in.PriorityClassName, &out.PriorityClassName + *out = new(string) + **out = **in + } + if in.SchedulerName != nil { + in, out := &in.SchedulerName, &out.SchedulerName + *out = new(string) + **out = **in + } + if in.NodeSelector != nil { + in, out := &in.NodeSelector, &out.NodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Annotations != nil { + in, out := &in.Annotations, &out.Annotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]corev1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]corev1.EnvVar, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.AdditionalContainers != nil { + in, out := &in.AdditionalContainers, &out.AdditionalContainers + *out = make([]corev1.Container, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.AdditionalVolumes != nil { + in, out := &in.AdditionalVolumes, &out.AdditionalVolumes + *out = make([]corev1.Volume, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.TerminationGracePeriodSeconds != nil { + in, out := &in.TerminationGracePeriodSeconds, &out.TerminationGracePeriodSeconds + *out = new(int64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComponentSpec. +func (in *ComponentSpec) DeepCopy() *ComponentSpec { + if in == nil { + return nil + } + out := new(ComponentSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FilerSpec) DeepCopyInto(out *FilerSpec) { + *out = *in + in.ComponentSpec.DeepCopyInto(&out.ComponentSpec) + in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FilerSpec. +func (in *FilerSpec) DeepCopy() *FilerSpec { + if in == nil { + return nil + } + out := new(FilerSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MasterSpec) DeepCopyInto(out *MasterSpec) { + *out = *in + in.ComponentSpec.DeepCopyInto(&out.ComponentSpec) + in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterSpec. +func (in *MasterSpec) DeepCopy() *MasterSpec { + if in == nil { + return nil + } + out := new(MasterSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Seaweed) DeepCopyInto(out *Seaweed) { *out = *in @@ -86,6 +218,21 @@ func (in *SeaweedList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SeaweedSpec) DeepCopyInto(out *SeaweedSpec) { *out = *in + if in.Master != nil { + in, out := &in.Master, &out.Master + *out = new(MasterSpec) + (*in).DeepCopyInto(*out) + } + if in.Volume != nil { + in, out := &in.Volume, &out.Volume + *out = new(VolumeSpec) + (*in).DeepCopyInto(*out) + } + if in.Filer != nil { + in, out := &in.Filer, &out.Filer + *out = new(FilerSpec) + (*in).DeepCopyInto(*out) + } if in.Hosts != nil { in, out := &in.Hosts, &out.Hosts *out = make([]string, len(*in)) @@ -117,3 +264,20 @@ func (in *SeaweedStatus) DeepCopy() *SeaweedStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VolumeSpec) DeepCopyInto(out *VolumeSpec) { + *out = *in + in.ComponentSpec.DeepCopyInto(&out.ComponentSpec) + in.ResourceRequirements.DeepCopyInto(&out.ResourceRequirements) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeSpec. +func (in *VolumeSpec) DeepCopy() *VolumeSpec { + if in == nil { + return nil + } + out := new(VolumeSpec) + in.DeepCopyInto(out) + return out +} diff --git a/controllers/controller_filer_statefulset.go b/controllers/controller_filer_statefulset.go index 82d065e..fdefa53 100644 --- a/controllers/controller_filer_statefulset.go +++ b/controllers/controller_filer_statefulset.go @@ -13,7 +13,7 @@ import ( func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet { labels := labelsForFiler(m.Name) - replicas := int32(m.Spec.FilerCount) + replicas := int32(m.Spec.Filer.Replicas) rollingUpdatePartition := int32(0) enableServiceLinks := false @@ -76,8 +76,7 @@ func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1 "-ec", fmt.Sprintf("weed filer -port=8888 %s %s -s3", fmt.Sprintf("-ip=$(POD_NAME).%s-filer", m.Name), - fmt.Sprintf("-master=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333", - m.Name, m.Name, m.Name, m.Name, m.Name, m.Name), + fmt.Sprintf("-master=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), ), }, Ports: []corev1.ContainerPort{ diff --git a/controllers/controller_master_statefulset.go b/controllers/controller_master_statefulset.go index e6720cf..c155d36 100644 --- a/controllers/controller_master_statefulset.go +++ b/controllers/controller_master_statefulset.go @@ -13,7 +13,7 @@ import ( func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet { labels := labelsForMaster(m.Name) - replicas := int32(MasterClusterSize) + replicas := m.Spec.Master.Replicas rollingUpdatePartition := int32(0) enableServiceLinks := false @@ -40,20 +40,7 @@ func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv Labels: labels, }, Spec: corev1.PodSpec{ - /* - Affinity: &corev1.Affinity{ - PodAntiAffinity: &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: labels, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - }, - */ + Affinity: m.Spec.Master.Affinity, EnableServiceLinks: &enableServiceLinks, Containers: []corev1.Container{{ Name: "seaweedfs", @@ -90,8 +77,7 @@ func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv "-ec", fmt.Sprintf("sleep 60; weed master -volumePreallocate -volumeSizeLimitMB=1000 %s %s", fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name), - fmt.Sprintf("-peers=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333", - m.Name, m.Name, m.Name, m.Name, m.Name, m.Name), + fmt.Sprintf("-peers=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), ), }, Ports: []corev1.ContainerPort{ diff --git a/controllers/controler_util.go b/controllers/controller_util.go similarity index 100% rename from controllers/controler_util.go rename to controllers/controller_util.go diff --git a/controllers/controller_volume_statefulset.go b/controllers/controller_volume_statefulset.go index 81b9069..c6fa709 100644 --- a/controllers/controller_volume_statefulset.go +++ b/controllers/controller_volume_statefulset.go @@ -2,12 +2,10 @@ package controllers import ( "fmt" - "log" "strings" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -16,18 +14,14 @@ import ( func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet { labels := labelsForVolumeServer(m.Name) - replicas := int32(m.Spec.VolumeServerCount) + replicas := int32(m.Spec.Volume.Replicas) rollingUpdatePartition := int32(0) enableServiceLinks := false - volumeQuantity := fmt.Sprintf("%dGi", m.Spec.VolumeServerDiskSizeInGiB) volumeCount := int(m.Spec.VolumeServerDiskCount) - quantity, err := resource.ParseQuantity(volumeQuantity) - if err != nil { - log.Fatalf("can not parse quantity %s", volumeQuantity) + volumeRequests := corev1.ResourceList{ + corev1.ResourceStorage: m.Spec.Volume.Requests[corev1.ResourceStorage], } - volumeRequests := make(corev1.ResourceList) - volumeRequests[corev1.ResourceStorage] = quantity // connect all the disks var volumeMounts []corev1.VolumeMount @@ -125,8 +119,7 @@ func (r *SeaweedReconciler) createVolumeServerStatefulSet(m *seaweedv1.Seaweed) fmt.Sprintf("weed volume -port=8444 -max=0 %s %s %s", fmt.Sprintf("-ip=$(POD_NAME).%s-volume", m.Name), fmt.Sprintf("-dir=%s", strings.Join(dirs, ",")), - fmt.Sprintf("-mserver=%s-master-0.%s-master:9333,%s-master-1.%s-master:9333,%s-master-2.%s-master:9333", - m.Name, m.Name, m.Name, m.Name, m.Name, m.Name), + fmt.Sprintf("-mserver=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)), ), }, Ports: []corev1.ContainerPort{ diff --git a/controllers/helper.go b/controllers/helper.go index d57c68f..8e7de1a 100644 --- a/controllers/helper.go +++ b/controllers/helper.go @@ -1,6 +1,15 @@ package controllers -import ctrl "sigs.k8s.io/controller-runtime" +import ( + "fmt" + "strings" + + ctrl "sigs.k8s.io/controller-runtime" +) + +const ( + masterPeerAddressPattern = "%s-master-%d.%s-master:9333" +) func ReconcileResult(err error) (bool, ctrl.Result, error) { if err != nil { @@ -8,3 +17,15 @@ func ReconcileResult(err error) (bool, ctrl.Result, error) { } return false, ctrl.Result{}, nil } + +func getMasterAddresses(name string, replicas int32) []string { + peersAddresses := make([]string, 0, replicas) + for i := int32(0); i < replicas; i++ { + peersAddresses = append(peersAddresses, fmt.Sprintf(masterPeerAddressPattern, name, i, name)) + } + return peersAddresses +} + +func getMasterPeersString(name string, replicas int32) string { + return strings.Join(getMasterAddresses(name, replicas), ",") +}