Merge branch 'master' into peersvc
This commit is contained in:
commit
e7b287bb6f
|
@ -118,6 +118,15 @@ type MasterSpec struct {
|
|||
// +kubebuilder:validation:Minimum=1
|
||||
Replicas int32 `json:"replicas"`
|
||||
Service *ServiceSpec `json:"service,omitempty"`
|
||||
|
||||
// Config in raw toml string
|
||||
Config *string `json:"config,omitempty"`
|
||||
|
||||
VolumePreallocate *bool `json:"volumePreallocate,omitempty"`
|
||||
VolumeSizeLimitMB *int32 `json:"volumeSizeLimitMB,omitempty"`
|
||||
GarbageThreshold *float64 `json:"garbageThreshold,omitempty"`
|
||||
PulseSeconds *int32 `json:"pulseSeconds,omitempty"`
|
||||
DefaultReplication *string `json:"defaultReplication,omitempty"`
|
||||
}
|
||||
|
||||
// VolumeSpec is the spec for volume servers
|
||||
|
@ -140,6 +149,8 @@ type FilerSpec struct {
|
|||
// +kubebuilder:validation:Minimum=1
|
||||
Replicas int32 `json:"replicas"`
|
||||
Service *ServiceSpec `json:"service,omitempty"`
|
||||
// Config in raw toml string
|
||||
Config *string `json:"config,omitempty"`
|
||||
}
|
||||
|
||||
// ComponentSpec is the base spec of each component, the fields should always accessed by the Basic<Component>Spec() method to respect the cluster-level properties
|
||||
|
|
|
@ -132,6 +132,11 @@ func (in *FilerSpec) DeepCopyInto(out *FilerSpec) {
|
|||
*out = new(ServiceSpec)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.Config != nil {
|
||||
in, out := &in.Config, &out.Config
|
||||
*out = new(string)
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FilerSpec.
|
||||
|
@ -154,6 +159,36 @@ func (in *MasterSpec) DeepCopyInto(out *MasterSpec) {
|
|||
*out = new(ServiceSpec)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.Config != nil {
|
||||
in, out := &in.Config, &out.Config
|
||||
*out = new(string)
|
||||
**out = **in
|
||||
}
|
||||
if in.VolumePreallocate != nil {
|
||||
in, out := &in.VolumePreallocate, &out.VolumePreallocate
|
||||
*out = new(bool)
|
||||
**out = **in
|
||||
}
|
||||
if in.VolumeSizeLimitMB != nil {
|
||||
in, out := &in.VolumeSizeLimitMB, &out.VolumeSizeLimitMB
|
||||
*out = new(int32)
|
||||
**out = **in
|
||||
}
|
||||
if in.GarbageThreshold != nil {
|
||||
in, out := &in.GarbageThreshold, &out.GarbageThreshold
|
||||
*out = new(float64)
|
||||
**out = **in
|
||||
}
|
||||
if in.PulseSeconds != nil {
|
||||
in, out := &in.PulseSeconds, &out.PulseSeconds
|
||||
*out = new(int32)
|
||||
**out = **in
|
||||
}
|
||||
if in.DefaultReplication != nil {
|
||||
in, out := &in.DefaultReplication, &out.DefaultReplication
|
||||
*out = new(string)
|
||||
**out = **in
|
||||
}
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MasterSpec.
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
|
||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||
)
|
||||
|
@ -23,6 +24,10 @@ func (r *SeaweedReconciler) ensureFilerServers(seaweedCR *seaweedv1.Seaweed) (do
|
|||
return
|
||||
}
|
||||
|
||||
if done, result, err = r.ensureFilerConfigMap(seaweedCR); done {
|
||||
return
|
||||
}
|
||||
|
||||
if done, result, err = r.ensureFilerStatefulSet(seaweedCR); done {
|
||||
return
|
||||
}
|
||||
|
@ -34,6 +39,9 @@ func (r *SeaweedReconciler) ensureFilerStatefulSet(seaweedCR *seaweedv1.Seaweed)
|
|||
log := r.Log.WithValues("sw-filer-statefulset", seaweedCR.Name)
|
||||
|
||||
filerStatefulSet := r.createFilerStatefulSet(seaweedCR)
|
||||
if err := controllerutil.SetControllerReference(seaweedCR, filerStatefulSet, r.Scheme); err != nil {
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
_, err := r.CreateOrUpdate(filerStatefulSet, func(existing, desired runtime.Object) error {
|
||||
existingStatefulSet := existing.(*appsv1.StatefulSet)
|
||||
desiredStatefulSet := desired.(*appsv1.StatefulSet)
|
||||
|
@ -51,8 +59,11 @@ func (r *SeaweedReconciler) ensureFilerPeerService(seaweedCR *seaweedv1.Seaweed)
|
|||
log := r.Log.WithValues("sw-filer-peer-service", seaweedCR.Name)
|
||||
|
||||
filerPeerService := r.createFilerPeerService(seaweedCR)
|
||||
_, err := r.CreateOrUpdateService(filerPeerService)
|
||||
if err := controllerutil.SetControllerReference(seaweedCR, filerPeerService, r.Scheme); err != nil {
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
|
||||
_, err := r.CreateOrUpdateService(filerPeerService)
|
||||
log.Info("ensure filer peer service " + filerPeerService.Name)
|
||||
|
||||
return ReconcileResult(err)
|
||||
|
@ -63,6 +74,9 @@ func (r *SeaweedReconciler) ensureFilerService(seaweedCR *seaweedv1.Seaweed) (bo
|
|||
log := r.Log.WithValues("sw-filer-service", seaweedCR.Name)
|
||||
|
||||
filerService := r.createFilerService(seaweedCR)
|
||||
if err := controllerutil.SetControllerReference(seaweedCR, filerService, r.Scheme); err != nil {
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
_, err := r.CreateOrUpdateService(filerService)
|
||||
|
||||
log.Info("ensure filer service " + filerService.Name)
|
||||
|
@ -70,6 +84,16 @@ func (r *SeaweedReconciler) ensureFilerService(seaweedCR *seaweedv1.Seaweed) (bo
|
|||
return ReconcileResult(err)
|
||||
}
|
||||
|
||||
func (r *SeaweedReconciler) ensureFilerConfigMap(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||
log := r.Log.WithValues("sw-filer-configmap", seaweedCR.Name)
|
||||
|
||||
filerConfigMap := r.createFilerConfigMap(seaweedCR)
|
||||
_, err := r.CreateOrUpdateConfigMap(filerConfigMap)
|
||||
|
||||
log.Info("Get filer ConfigMap " + filerConfigMap.Name)
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
|
||||
func labelsForFiler(name string) map[string]string {
|
||||
return map[string]string{"app": "seaweedfs", "role": "filer", "name": name}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
package controllers
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||
)
|
||||
|
||||
func (r *SeaweedReconciler) createFilerConfigMap(m *seaweedv1.Seaweed) *corev1.ConfigMap {
|
||||
labels := labelsForFiler(m.Name)
|
||||
|
||||
toml := ""
|
||||
if m.Spec.Master.Config != nil {
|
||||
toml = *m.Spec.Filer.Config
|
||||
}
|
||||
|
||||
dep := &corev1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: m.Name + "-filer",
|
||||
Namespace: m.Namespace,
|
||||
Labels: labels,
|
||||
},
|
||||
Data: map[string]string{
|
||||
"filer.toml": toml,
|
||||
},
|
||||
}
|
||||
return dep
|
||||
}
|
|
@ -29,6 +29,18 @@ func (r *SeaweedReconciler) createFilerStatefulSet(m *seaweedv1.Seaweed) *appsv1
|
|||
enableServiceLinks := false
|
||||
|
||||
filerPodSpec := m.BaseFilerSpec().BuildPodSpec()
|
||||
filerPodSpec.Volumes = []corev1.Volume{
|
||||
{
|
||||
Name: "filer-config",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
ConfigMap: &corev1.ConfigMapVolumeSource{
|
||||
LocalObjectReference: corev1.LocalObjectReference{
|
||||
Name: m.Name + "-filer",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
filerPodSpec.EnableServiceLinks = &enableServiceLinks
|
||||
filerPodSpec.Containers = []corev1.Container{{
|
||||
Name: "seaweedfs",
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
|
||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||
)
|
||||
|
@ -24,6 +25,10 @@ func (r *SeaweedReconciler) ensureMaster(seaweedCR *seaweedv1.Seaweed) (done boo
|
|||
return
|
||||
}
|
||||
|
||||
if done, result, err = r.ensureMasterConfigMap(seaweedCR); done {
|
||||
return
|
||||
}
|
||||
|
||||
if done, result, err = r.ensureMasterStatefulSet(seaweedCR); done {
|
||||
return
|
||||
}
|
||||
|
@ -40,6 +45,9 @@ func (r *SeaweedReconciler) ensureMasterStatefulSet(seaweedCR *seaweedv1.Seaweed
|
|||
if err != nil && errors.IsNotFound(err) {
|
||||
// Define a new deployment
|
||||
dep := r.createMasterStatefulSet(seaweedCR)
|
||||
if err := controllerutil.SetControllerReference(seaweedCR, dep, r.Scheme); err != nil {
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
log.Info("Creating a new master statefulset", "Namespace", dep.Namespace, "Name", dep.Name)
|
||||
err = r.Create(ctx, dep)
|
||||
if err != nil {
|
||||
|
@ -70,15 +78,27 @@ func (r *SeaweedReconciler) ensureMasterStatefulSet(seaweedCR *seaweedv1.Seaweed
|
|||
return ReconcileResult(err)
|
||||
}
|
||||
|
||||
func (r *SeaweedReconciler) ensureMasterConfigMap(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||
log := r.Log.WithValues("sw-master-configmap", seaweedCR.Name)
|
||||
|
||||
masterConfigMap := r.createMasterConfigMap(seaweedCR)
|
||||
_, err := r.CreateOrUpdateConfigMap(masterConfigMap)
|
||||
|
||||
log.Info("Get master ConfigMap " + masterConfigMap.Name)
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
|
||||
func (r *SeaweedReconciler) ensureMasterService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||
log := r.Log.WithValues("sw-master-service", seaweedCR.Name)
|
||||
|
||||
masterService := r.createMasterService(seaweedCR)
|
||||
if err := controllerutil.SetControllerReference(seaweedCR, masterService, r.Scheme); err != nil {
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
_, err := r.CreateOrUpdateService(masterService)
|
||||
|
||||
log.Info("Get master service " + masterService.Name)
|
||||
return ReconcileResult(err)
|
||||
|
||||
}
|
||||
|
||||
func (r *SeaweedReconciler) ensureMasterPeerService(seaweedCR *seaweedv1.Seaweed) (bool, ctrl.Result, error) {
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
package controllers
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||
)
|
||||
|
||||
func (r *SeaweedReconciler) createMasterConfigMap(m *seaweedv1.Seaweed) *corev1.ConfigMap {
|
||||
labels := labelsForMaster(m.Name)
|
||||
|
||||
toml := ""
|
||||
if m.Spec.Master.Config != nil {
|
||||
toml = *m.Spec.Master.Config
|
||||
}
|
||||
|
||||
dep := &corev1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: m.Name + "-master",
|
||||
Namespace: m.Namespace,
|
||||
Labels: labels,
|
||||
},
|
||||
Data: map[string]string{
|
||||
"master.toml": toml,
|
||||
},
|
||||
}
|
||||
// Set master instance as the owner and controller
|
||||
// ctrl.SetControllerReference(m, dep, r.Scheme)
|
||||
return dep
|
||||
}
|
|
@ -2,6 +2,7 @@ package controllers
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
@ -11,6 +12,30 @@ import (
|
|||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||
)
|
||||
|
||||
func buildMasterStartupScript(m *seaweedv1.Seaweed) string {
|
||||
command := []string{"weed", "master"}
|
||||
spec := m.Spec.Master
|
||||
if spec.VolumePreallocate != nil && *spec.VolumePreallocate {
|
||||
command = append(command, "-volumePreallocate")
|
||||
}
|
||||
|
||||
if spec.VolumeSizeLimitMB != nil {
|
||||
command = append(command, fmt.Sprintf("-volumeSizeLimitMB=%d", *spec.VolumeSizeLimitMB))
|
||||
}
|
||||
|
||||
if spec.GarbageThreshold != nil {
|
||||
command = append(command, fmt.Sprintf("-garbageThreshold=%f", *spec.GarbageThreshold))
|
||||
}
|
||||
|
||||
if spec.PulseSeconds != nil {
|
||||
command = append(command, fmt.Sprintf("-pulseSeconds=%d", *spec.PulseSeconds))
|
||||
}
|
||||
|
||||
command = append(command, fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name))
|
||||
command = append(command, fmt.Sprintf("-peers=%s", getMasterPeersString(m.Name, spec.Replicas)))
|
||||
return strings.Join(command, " ")
|
||||
}
|
||||
|
||||
func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv1.StatefulSet {
|
||||
labels := labelsForMaster(m.Name)
|
||||
replicas := m.Spec.Master.Replicas
|
||||
|
@ -18,19 +43,35 @@ func (r *SeaweedReconciler) createMasterStatefulSet(m *seaweedv1.Seaweed) *appsv
|
|||
enableServiceLinks := false
|
||||
|
||||
masterPodSpec := m.BaseMasterSpec().BuildPodSpec()
|
||||
masterPodSpec.Volumes = []corev1.Volume{
|
||||
{
|
||||
Name: "master-config",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
ConfigMap: &corev1.ConfigMapVolumeSource{
|
||||
LocalObjectReference: corev1.LocalObjectReference{
|
||||
Name: m.Name + "-master",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
masterPodSpec.EnableServiceLinks = &enableServiceLinks
|
||||
masterPodSpec.Containers = []corev1.Container{{
|
||||
Name: "seaweedfs",
|
||||
Image: m.Spec.Image,
|
||||
ImagePullPolicy: m.BaseMasterSpec().ImagePullPolicy(),
|
||||
Env: append(m.BaseMasterSpec().Env(), kubernetesEnvVars...),
|
||||
VolumeMounts: []corev1.VolumeMount{
|
||||
{
|
||||
Name: "master-config",
|
||||
ReadOnly: true,
|
||||
MountPath: "/etc/seaweedfs",
|
||||
},
|
||||
},
|
||||
Command: []string{
|
||||
"/bin/sh",
|
||||
"-ec",
|
||||
fmt.Sprintf("sleep 60; weed master -volumePreallocate -volumeSizeLimitMB=1000 %s %s",
|
||||
fmt.Sprintf("-ip=$(POD_NAME).%s-master", m.Name),
|
||||
fmt.Sprintf("-peers=%s", getMasterPeersString(m.Name, m.Spec.Master.Replicas)),
|
||||
),
|
||||
buildMasterStartupScript(m),
|
||||
},
|
||||
Ports: []corev1.ContainerPort{
|
||||
{
|
||||
|
|
|
@ -259,6 +259,27 @@ func (r *SeaweedReconciler) CreateOrUpdateIngress(ingress *extensionsv1beta1.Ing
|
|||
return result.(*extensionsv1beta1.Ingress), nil
|
||||
}
|
||||
|
||||
func (r *SeaweedReconciler) CreateOrUpdateConfigMap(configMap *corev1.ConfigMap) (*corev1.ConfigMap, error) {
|
||||
result, err := r.CreateOrUpdate(configMap, func(existing, desired runtime.Object) error {
|
||||
existingConfigMap := existing.(*corev1.ConfigMap)
|
||||
desiredConfigMap := desired.(*corev1.ConfigMap)
|
||||
|
||||
if existingConfigMap.Annotations == nil {
|
||||
existingConfigMap.Annotations = map[string]string{}
|
||||
}
|
||||
for k, v := range desiredConfigMap.Annotations {
|
||||
existingConfigMap.Annotations[k] = v
|
||||
}
|
||||
existingConfigMap.Labels = desiredConfigMap.Labels
|
||||
existingConfigMap.Data = desiredConfigMap.Data
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result.(*corev1.ConfigMap), nil
|
||||
}
|
||||
|
||||
// EmptyClone create an clone of the resource with the same name and namespace (if namespace-scoped), with other fields unset
|
||||
func EmptyClone(obj runtime.Object) (runtime.Object, error) {
|
||||
meta, ok := obj.(metav1.Object)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
|
||||
seaweedv1 "github.com/seaweedfs/seaweedfs-operator/api/v1"
|
||||
)
|
||||
|
@ -19,7 +20,7 @@ func (r *SeaweedReconciler) ensureVolumeServers(seaweedCR *seaweedv1.Seaweed) (d
|
|||
return
|
||||
}
|
||||
|
||||
if done, result, err = r.ensureVolumeServerService(seaweedCR); done {
|
||||
if done, result, err = r.ensureVolumeServerStatefulSet(seaweedCR); done {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -34,6 +35,9 @@ func (r *SeaweedReconciler) ensureVolumeServerStatefulSet(seaweedCR *seaweedv1.S
|
|||
log := r.Log.WithValues("sw-volume-statefulset", seaweedCR.Name)
|
||||
|
||||
volumeServerStatefulSet := r.createVolumeServerStatefulSet(seaweedCR)
|
||||
if err := controllerutil.SetControllerReference(seaweedCR, volumeServerStatefulSet, r.Scheme); err != nil {
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
_, err := r.CreateOrUpdate(volumeServerStatefulSet, func(existing, desired runtime.Object) error {
|
||||
existingStatefulSet := existing.(*appsv1.StatefulSet)
|
||||
desiredStatefulSet := desired.(*appsv1.StatefulSet)
|
||||
|
@ -63,6 +67,9 @@ func (r *SeaweedReconciler) ensureVolumeServerService(seaweedCR *seaweedv1.Seawe
|
|||
log := r.Log.WithValues("sw-volume-service", seaweedCR.Name)
|
||||
|
||||
volumeServerService := r.createVolumeServerService(seaweedCR)
|
||||
if err := controllerutil.SetControllerReference(seaweedCR, volumeServerService, r.Scheme); err != nil {
|
||||
return ReconcileResult(err)
|
||||
}
|
||||
_, err := r.CreateOrUpdateService(volumeServerService)
|
||||
|
||||
log.Info("ensure volume service " + volumeServerService.Name)
|
||||
|
|
1
go.sum
1
go.sum
|
@ -769,6 +769,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
k8s.io/api v0.18.2 h1:wG5g5ZmSVgm5B+eHMIbI9EGATS2L8Z72rda19RIEgY8=
|
||||
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
|
||||
k8s.io/api v0.19.3 h1:GN6ntFnv44Vptj/b+OnMW7FmzkpDoIDLZRvKX3XH9aU=
|
||||
k8s.io/apiextensions-apiserver v0.18.2 h1:I4v3/jAuQC+89L3Z7dDgAiN4EOjN6sbm6iBqQwHTah8=
|
||||
k8s.io/apiextensions-apiserver v0.18.2/go.mod h1:q3faSnRGmYimiocj6cHQ1I3WpLqmDgJFlKL37fC4ZvY=
|
||||
k8s.io/apimachinery v0.18.2 h1:44CmtbmkzVDAhCpRVSiP2R5PPrC2RtlIv/MoB8xpdRA=
|
||||
|
|
Loading…
Reference in New Issue